[Kubernetes] - RabbitMQ学习
1.消息队列
- 消息: 在应用间传送的数据
- 队列,先进先出
1.2. 作用
- 好处:解耦, 容错,削峰
- 坏处:降低系统可用性,系统复杂度提高,一致性问题;
RabbitMQ组成部分:生产者,消费者,队列,交换机;
2. 安装部署rabbitmq
---
apiVersion: v1
kind: Secret
metadata:name: rabbitmq-secretnamespace: rabbitmq
data:username: YWRtaW4Kpassword: MTIzNDU2Cg==
type: Opaque
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: rabbitmqnamespace: rabbitmqlabels:app: rabbitmq
spec:replicas: 1selector:matchLabels:app: rabbitmqserviceName: rabbitmq-headlesstemplate:metadata:labels:app: rabbitmqspec:containers:- name: rabbitmqimage: registry.cn-hangzhou.aliyuncs.com/yuanli123/rabbitmq:3.9.22-managementports:- name: tcp-5672containerPort: 5672protocol: TCP- name: tcp-15672containerPort: 15672protocol: TCP
# 不知道为什么自己使用的username会多出一个回车字符导致rabbitmq无法识别到
# env:
# - name: RABBITMQ_DEFAULT_USER
# valueFrom:
# secretKeyRef:
# name: rabbitmq-secret
# key: username
# - name: RABBITMQ_DEFAULT_PASS
# valueFrom:
# secretKeyRef:
# name: rabbitmq-secret
# key: passwordresources:limits:cpu: '1'memory: '2Gi'requests:cpu: '200m'memory: '500Mi'imagePullSecrets:- name: regcred---
apiVersion: v1
kind: Service
metadata:name: rabbitmq-headlessnamespace: rabbitmqlabels:app: rabbitmq
spec:ports:- name: tcp-rabbitmq-5672port: 5672targetPort: 5672nodePort: 32672selector:app: rabbitmqtype: NodePort
---
apiVersion: v1
kind: Service
metadata:name: rabbitmq-externalnamespace: rabbitmqlabels:app: rabbitmq-external
spec:ports:- name: http-rabbitmq-externalprotocol: TCPport: 15672targetPort: 15672selector:app: rabbitmqtype: ClusterIP---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:name: rabbitmq-ingressnamespace: rabbitmqannotations:nginx.ingress.kubernetes.io/rewrite-target: /
spec:ingressClassName: nginxrules:- host: rabbitmq.liyuan.comhttp:paths:- backend:service:name: rabbitmq-externalport:number: 15672pathType: Prefixpath: /
根据上述yaml,再结合修改 /etc/hosts 文件
通过 http://rabbitmq.liyuan.com:30001/#/exchanges 访问
并暴露了 192,168.31.175:32672 用于发消息
2.1.名词解释
- Broker: 接收和分发消息的应用
- Virtual Host: 虚拟主机,一个Broker可以有多个Virtual Host, 每个Virtual Host都有自己一套的Exchange和Queue
- Connection: 生产者/消费者和Broker之间的TCP链接
- Channel: 发送消息的通道,channel是在connection内部建立逻辑链接,AMQP method包含了channel id帮助客户端和message Broker识别Broker,减少建立TCP Connection的开销;
- Exchange:message到达broker的第一站,根据分发规则,查询表中的routing key,分发消息到queue中去,常用类型有:direct, topic, fanout(multicast)
- Queue: 存放消息的队列
- Binding:Exchange和Queue之间的虚拟链接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据;
3.使用测试
rabbitmq-test 源码参考
3.1.pom.xml
# pom.yaml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>rqbbitmq-test</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency></dependencies>
</project>
3.2.生产者Producer
// Producer.java
package com.liyuan.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.31.175");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setPort(32672);try (Connection connection = connectionFactory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "xc_exchange_name";AMQP.Exchange.DeclareOk exchangeDeclare = channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);String queueName = "xc_queue_name";AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, exchangeName, queueName);String message = "Hello, my name is liyuan.";channel.basicPublish(exchangeName, queueName, null, message.getBytes());channel.close();}}
}
3.3.消费者Consumer
package com.liyuan.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.31.175");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setPort(32672);try (Connection connection = connectionFactory.newConnection()) {Channel channel = connection.createChannel();String exchangeName = "xc_exchange_name";String queueName = "xc_queue_name";DeliverCallback deliverCallback = new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println("Delivered consuming: " + consumerTag + " " + new String(message.getBody()));}};CancelCallback cancelCallback = new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("Canceled: " + consumerTag);}};channel.basicConsume(queueName, true, deliverCallback, cancelCallback);channel.close();}}
}
4.rabbitmq交换机类型
4.1. BuiltinExchangeType.DIRECT
路由键与队列完全匹配交换机,通过routingKey路由键将交换机和队列进行绑定,消息被发送到exchange时,根据消息的routingKey来进行匹配,只将消息发送到完全匹配此routingKey的队列;
且同一个key可以绑定多个queue,因此会同时将消息发给多个queue;
queueName | routingKey |
---|---|
queue01 | “direct_key01” |
queue02 | “direct_key02” |
根据上述表格的规则来发送消息,当发送消息时的routingKey为以下值时,以下队列会收到消息;
routingKey | queueName |
---|---|
“direct_key01” | queue01 will receive |
“direct_key02” | queue02 will receive |
4.2. BuiltinExchangeType.FANOUT
将消息分发给所有绑定了此交换机的队列;
queueName | routingKey |
---|---|
queue01 | “fanout_key01” |
queue02 | “fanout_key01” |
queue03 | “fanout_key01” |
根据上述表格的规则来发送消息,当发送消息时的routingKey为以下值时,以下队列会收到消息;
routingKey | queueName |
---|---|
“fanout_key01” | queue01, queue02, queue03 will receive |
4.3. BuiltinExchangeType.TOPIC
类似于direct方式,但是topic可以模糊匹配routingKey;通过此种方式,我们可以使得一个队列模糊绑定多个routingKey;
queueName | routingKey |
---|---|
queue01 | key1.key2.key3.* |
queue02 | key1.# |
queue03 | *.key2.*.key4 |
queue04 | #.key3.key4 |
- #:代表0个或多个部分
- *: 代表一个部分
- . : 用于分隔不同的routingKey;
根据上述表格的规则来发送消息,当发送消息时的routingKey为以下值时,以下队列会收到消息;
routingKey | queueName |
---|---|
“key1” | queue02 will receive |
“key3” | no queue will receive |
“key1.key2.key3” | queue02 will receive |
“key1.key2.key3.key4” | queue01, queue02, queue03, queue04 will receive |
4.3.BuiltinExchangeType.HEADERS
headers 匹配AMQP消息的header而不是路由键,此外headers交换器和direct交换器完全一致,但性能差了很多;
消费方要求指定的headers中必须包含一个"x-match"的键;
- x-match = all,表示所有的键值对都匹配才能接收到消息;
- x-match = any,表示只要有键值对匹配就能接收到消息;
生产者按照 x-match 配置的规则发送消息到指定的queue上;
queueName | x-match |
---|---|
queue01 | {name:“liyuan01”, sex:“male”, x-match: all} |
queue02 | {name:“liyuan02”, sex:“male”, x-match: any} |
通过上述规则,按照指定的消息头发送消息时;
x-match | queueName |
---|---|
{name: “liyuan01”} | no queue will receive |
{name: “liyuan02”} | queue02 will receive |
{name: “liyuan01”, sex:“male”} | queue02, queue01 will receive |