当前位置: 首页 > news >正文

[Kubernetes] - RabbitMQ学习

1.消息队列

  • 消息: 在应用间传送的数据
  • 队列,先进先出

1.2. 作用

  • 好处:解耦, 容错,削峰
  • 坏处:降低系统可用性,系统复杂度提高,一致性问题;

RabbitMQ组成部分:生产者,消费者,队列,交换机;

2. 安装部署rabbitmq

---
apiVersion: v1
kind: Secret
metadata:name: rabbitmq-secretnamespace: rabbitmq
data:username: YWRtaW4Kpassword: MTIzNDU2Cg==
type: Opaque
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: rabbitmqnamespace: rabbitmqlabels:app: rabbitmq
spec:replicas: 1selector:matchLabels:app: rabbitmqserviceName: rabbitmq-headlesstemplate:metadata:labels:app: rabbitmqspec:containers:- name: rabbitmqimage: registry.cn-hangzhou.aliyuncs.com/yuanli123/rabbitmq:3.9.22-managementports:- name: tcp-5672containerPort: 5672protocol: TCP- name: tcp-15672containerPort: 15672protocol: TCP
#              不知道为什么自己使用的username会多出一个回车字符导致rabbitmq无法识别到
#          env:
#            - name: RABBITMQ_DEFAULT_USER
#              valueFrom:
#                secretKeyRef:
#                  name: rabbitmq-secret
#                  key: username
#            - name: RABBITMQ_DEFAULT_PASS
#              valueFrom:
#                secretKeyRef:
#                  name: rabbitmq-secret
#                  key: passwordresources:limits:cpu: '1'memory: '2Gi'requests:cpu: '200m'memory: '500Mi'imagePullSecrets:- name: regcred---
apiVersion: v1
kind: Service
metadata:name: rabbitmq-headlessnamespace: rabbitmqlabels:app: rabbitmq
spec:ports:- name: tcp-rabbitmq-5672port: 5672targetPort: 5672nodePort: 32672selector:app: rabbitmqtype: NodePort
---
apiVersion: v1
kind: Service
metadata:name: rabbitmq-externalnamespace: rabbitmqlabels:app: rabbitmq-external
spec:ports:- name: http-rabbitmq-externalprotocol: TCPport: 15672targetPort: 15672selector:app: rabbitmqtype: ClusterIP---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:name: rabbitmq-ingressnamespace: rabbitmqannotations:nginx.ingress.kubernetes.io/rewrite-target: /
spec:ingressClassName: nginxrules:- host: rabbitmq.liyuan.comhttp:paths:- backend:service:name: rabbitmq-externalport:number: 15672pathType: Prefixpath: /

根据上述yaml,再结合修改 /etc/hosts 文件
在这里插入图片描述
通过 http://rabbitmq.liyuan.com:30001/#/exchanges 访问
在这里插入图片描述
并暴露了 192,168.31.175:32672 用于发消息

2.1.名词解释

  • Broker: 接收和分发消息的应用
  • Virtual Host: 虚拟主机,一个Broker可以有多个Virtual Host, 每个Virtual Host都有自己一套的Exchange和Queue
  • Connection: 生产者/消费者和Broker之间的TCP链接
  • Channel: 发送消息的通道,channel是在connection内部建立逻辑链接,AMQP method包含了channel id帮助客户端和message Broker识别Broker,减少建立TCP Connection的开销;
  • Exchange:message到达broker的第一站,根据分发规则,查询表中的routing key,分发消息到queue中去,常用类型有:direct, topic, fanout(multicast)
  • Queue: 存放消息的队列
  • Binding:Exchange和Queue之间的虚拟链接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据;

3.使用测试

rabbitmq-test 源码参考

3.1.pom.xml

# pom.yaml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>rqbbitmq-test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency></dependencies>
</project>

3.2.生产者Producer

// Producer.java
package com.liyuan.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.31.175");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setPort(32672);try (Connection connection = connectionFactory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "xc_exchange_name";AMQP.Exchange.DeclareOk exchangeDeclare = channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);String queueName = "xc_queue_name";AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, exchangeName, queueName);String message = "Hello, my name is liyuan.";channel.basicPublish(exchangeName, queueName, null, message.getBytes());channel.close();}}
}

3.3.消费者Consumer

package com.liyuan.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.31.175");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setPort(32672);try (Connection connection = connectionFactory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "xc_exchange_name";String queueName = "xc_queue_name";DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("Delivered consuming: " + consumerTag + " " + new String(message.getBody()));}};CancelCallback cancelCallback = new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("Canceled: " + consumerTag);}};channel.basicConsume(queueName, true, deliverCallback, cancelCallback);channel.close();}}
}

4.rabbitmq交换机类型

4.1. BuiltinExchangeType.DIRECT

路由键与队列完全匹配交换机,通过routingKey路由键将交换机和队列进行绑定,消息被发送到exchange时,根据消息的routingKey来进行匹配,只将消息发送到完全匹配此routingKey的队列;
且同一个key可以绑定多个queue,因此会同时将消息发给多个queue;

queueNameroutingKey
queue01“direct_key01”
queue02“direct_key02”

根据上述表格的规则来发送消息,当发送消息时的routingKey为以下值时,以下队列会收到消息;

routingKeyqueueName
“direct_key01”queue01 will receive
“direct_key02”queue02 will receive

4.2. BuiltinExchangeType.FANOUT

将消息分发给所有绑定了此交换机的队列;

queueNameroutingKey
queue01“fanout_key01”
queue02“fanout_key01”
queue03“fanout_key01”

根据上述表格的规则来发送消息,当发送消息时的routingKey为以下值时,以下队列会收到消息;

routingKeyqueueName
“fanout_key01”queue01, queue02, queue03 will receive

4.3. BuiltinExchangeType.TOPIC

类似于direct方式,但是topic可以模糊匹配routingKey;通过此种方式,我们可以使得一个队列模糊绑定多个routingKey;

queueNameroutingKey
queue01key1.key2.key3.*
queue02key1.#
queue03*.key2.*.key4
queue04#.key3.key4
  • #:代表0个或多个部分
  • *: 代表一个部分
  • . : 用于分隔不同的routingKey;

根据上述表格的规则来发送消息,当发送消息时的routingKey为以下值时,以下队列会收到消息;

routingKeyqueueName
“key1”queue02 will receive
“key3”no queue will receive
“key1.key2.key3”queue02 will receive
“key1.key2.key3.key4”queue01, queue02, queue03, queue04 will receive

4.3.BuiltinExchangeType.HEADERS

headers 匹配AMQP消息的header而不是路由键,此外headers交换器和direct交换器完全一致,但性能差了很多;
消费方要求指定的headers中必须包含一个"x-match"的键;

  • x-match = all,表示所有的键值对都匹配才能接收到消息;
  • x-match = any,表示只要有键值对匹配就能接收到消息;

生产者按照 x-match 配置的规则发送消息到指定的queue上;

queueNamex-match
queue01{name:“liyuan01”, sex:“male”, x-match: all}
queue02{name:“liyuan02”, sex:“male”, x-match: any}

通过上述规则,按照指定的消息头发送消息时;

x-matchqueueName
{name: “liyuan01”}no queue will receive
{name: “liyuan02”}queue02 will receive
{name: “liyuan01”, sex:“male”}queue02, queue01 will receive
http://www.lryc.cn/news/91069.html

相关文章:

  • swagger页面 doc.html出不来,swagger-ui/index.html能出来
  • IEEE802.3和IEEE802.11的分类(仅为分类)
  • c# cad二次开发通过获取excel数据 在CAD绘图,将CAD属性导出到excel
  • LLM之高性能向量检索库
  • 实体类注解
  • 常见数据结构种类
  • linux高级---k8s中的五种控制器
  • 记一次udp服务性能优化经历
  • uniapp和VueI18n多语言H5项目语言国际化功能搭建流程
  • C# | 凸包算法之Jarvis,寻找一组点的边界/轮廓
  • SpringBoot接收请求参数的方式
  • MKS SERVO4257D 闭环步进电机_系列5 CAN指令说明
  • 安捷伦E4440A(Agilent) e4440a 3HZ-26.5G频谱分析仪
  • 华为OD机试真题 Java 实现【最长子字符串的长度】【2022Q4 100分】,附详细解题思路
  • 【iOS】--对象的底层结构
  • 高并发内存池设计_内存池
  • 给编程初学者的一封信
  • 【无功优化】基于改进教与学算法的配电网无功优化【IEEE33节点】(Matlab代码时候)
  • 数据在内存中的存储(超详细讲解)
  • log4cplus使用示例
  • 人工智能学习07--pytorch20--目标检测:COCO数据集介绍+pycocotools简单使用
  • learnOpenGL-深度测试
  • 阿里云服务器数据盘是什么?系统盘和数据盘区别
  • linux常用命令精选
  • 人体行为足力特征分析及其应用研究_kaic
  • javascript基础二十七:说说 JavaScript 数字精度丢失的问题,解决方案?
  • 重塑工作场所:后疫情时代组织韧性的8个策略
  • TCP协议为什么要三次握手而不是两次?
  • 使用Vuex进行状态管理
  • 【优化调度】基于改进遗传算法的公交车调度排班优化的研究与实现(Matlab代码实现)