当前位置: 首页 > news >正文

2、RabbitMQ的5种模式基本使用(Maven项目)

本文案例中的代码没有对队列和消息设置为持久化,在实际开发中应该把队列和消息设置为持久化

作为一名开发者,主要关心的就是生产者如何发送消息,消费者如何消费消息。

1、创建普通maven项目

1.1 创建项目

1.2 父pom导入依赖

    <!--  rabbitmq  --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><!-- 不导入这个依赖,控制台会报错   --><!-- slf4j   --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.2</version></dependency>

1.3 maven查看

2、简单模式

简单模式实现Hello World

2.1 生产者

先编写生产者,想发送消息:

  1. 创建连接工厂对象(connectFactory)
  2. 设置rabbitmq的Ip地址
  3. 用连接工厂创建连接对象Connection
  4. 用连接对象创建channel
  5. 设置channel属性,例如队列名字,是否持久化等等
  6. channel调用basicPublish方法发送消息
  7. 关闭消息

2.1.1 代码实现

/***  生产者,生产消息 Hello World*/
@Slf4j
public class Producer {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等/*** 第一个参数:队列名字* 第二个参数:是否持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 6.channel调用basicPublish方法发送消息/*** 第一个参数:交换机名字* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/channel.basicPublish("", QUEUE_NAME, null, "Hello RabbitMQ".getBytes());// 7.关闭消息channel.close();connection.close();}}

2.1.2 实现效果

运行生产者后,看mq界面的queue


 

2.2 消费者

在编写消费者,签收消费消息:

  1. 创建连接工厂对象(connectFactory)
  2. 设置rabbitmq的Ip地址
  3. 用连接工厂创建连接对象Connection
  4. 用连接对象创建channel
  5. 设置channel属性,例如队列名字,是否持久化等等
  6. channel调用basicPublish方法发送消息
  7. 关闭消息

2.2.1 代码实现

/***  消费者,接收消息 Hello World*/
public class Consumer {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等/*** 第一个参数:队列名字* 第二个参数:是否持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 6.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println("消费者接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者取消消费的回调这个方法");}});}}

2.2.2 实现效果

 执行消费者

2.3 生产者中用到的属性

属性的设置,生产者和消费者要保持一致

2.3.1 持久化

Queues界面对应的消息Features列无D,意味着当前队列未持久化。

没有持久化,关掉消费者并且重新启动mq,消息队列会消失不见

// Producer类和Consumer类保持一致
/*** 第一个参数:队列名字* 第二个参数:是否持久化,设置为true,即持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);

2.3.2 排他性

设置为true,只能在一个连接对象,才能操作队列。生产者会创建一个连接对象,消费者也会创建一个连接对象,两个对象同时操作队列,会产生问题。开发中很少会创建于一个连接对象,一般是false

 如果设置为true,只能在一个类中同时写生产者和消费者代码// Producer类和Consumer类保持一致
/*** 第一个参数:队列名字* 第二个参数:是否持久化,设置为true,即持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(QUEUE_NAME, true, true, false, null);

2.3.3 自动删除

设置为true,如果没有消费者连接这个队列(断开消费者连接),就自动删除掉

// Producer类和Consumer类保持一致
/*** 第一个参数:队列名字* 第二个参数:是否持久化,设置为true,即持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(QUEUE_NAME, true, false, true, null);

2.3.4 消息持久化

队列持久化和消息持久化不是一个东西。

  • 队列持久化,保证队列重启是存在的,
  • 消息持久化是队列重启,消息不一定存在

/*** 第一个参数:交换机名字* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/
// 设置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN 是程序提供,不是人为编辑
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello RabbitMQ".getBytes());

2.3.5 消息内容

rabbitmq发送的消息内容是字符串,但并不是简简单单的字符串,可以把对象、集合等等转成字符串

3、Work模式(工作模式)

解决简单模式所产生的问题:生产者生产的消息,消费者来不及处理,导致消息中间件累积消息,为了避免消息中间件累计消息,多添加消费者进行消息接收。

3.1 生产者

3.1.1 代码实现

/***  生产者,生产消息 Hello RabbitMQ*/
public class Producer {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等/*** 第一个参数:队列名字* 第二个参数:是否持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);// 6.channel调用basicPublish方法发送消息/*** 第一个参数:交换机名字* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/for (int i = 0; i < 20; i++) {String mesage = "Hello RabbitMQ" + i;channel.basicPublish("", WORK_QUEUE_NAME, null, mesage.getBytes());}// 7.关闭消息channel.close();connection.close();}}

3.1.2 实现效果

3.2 消费者

work模式,有两个消费者

3.2.1 代码实现

/***  消费者01,接收消息 Hello RabbitMQ*/
public class ConsumerOne {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等/*** 第一个参数:队列名字* 第二个参数:是否持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);// 6.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(WORK_QUEUE_NAME, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费者01接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者01取消消费的回调这个方法");}});}}

/***  消费者02,接收消息 Hello RabbitMQ*/
public class ConsumerTwo {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等/*** 第一个参数:队列名字* 第二个参数:是否持久化* 第三个参数:是否排他性(设置true独占队列)* 第四个参数:是否自动删除(设置true就删除)* 第五个参数:是否设置其他额外参数*/channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);// 6.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(WORK_QUEUE_NAME, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费者02接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者02取消消费的回调这个方法");}});}}

3.2.2 实现效果

消费1,执行的速度快

消费2,执行的速度慢

两个消费者,虽然消费的速度不同,但是处理消息数量进行平分

3.2.3 优化Work模式问题

优化:

  1. 设置为预期
  2. 签收改为手动签收
/***  消费者01,接收消息 Hello RabbitMQ*/
public class ConsumerOne {public static void main(String[] args) throws IOException, TimeoutException {........// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);// 预取,告诉rabbitmq,在从mq中取消息的时候,一次取多少条// 设置为1,一次只能拿1条数据channel.basicQos(1);// 6.使用channel去rabbitmq中取消息进行消费channel.basicConsume(WORK_QUEUE_NAME, false, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费者01接收到的消息是:" + new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {public void handle(String consumerTag)  {System.out.println("消费者01取消消费的回调这个方法");}});}}
/***  消费者02,接收消息 Hello RabbitMQ*/
public class ConsumerTwo {public static void main(String[] args) throws IOException, TimeoutException {........// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性,例如队列名字,是否持久化等等channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);// 预取,告诉rabbitmq,在从mq中取消息的时候,一次取多少条// 设置为1,一次只能拿1条数据channel.basicQos(1);// 6.使用channel去rabbitmq中取消息进行消费channel.basicConsume(WORK_QUEUE_NAME, false, new DeliverCallback() {public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费者02接收到的消息是:" + new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {public void handle(String consumerTag)  {System.out.println("消费者02取消消费的回调这个方法");}});}}

3.3 消费者签收模式说明

用户做下单逻辑,生产者进行发消息,把消息存储到中间件,消费者从中间件取消息,立马进行签收,然后执行加积分逻辑,但是加积分操作失败了,消费者把消息签收了,mq把消息删除了,造成数据不一致。

签收应该在业务逻辑执行成功后,消费者手动签收消息。自动签收可能会存在数据不一致情况,手动签收即使是失败了,也能让生产者重新投递

4、Pub/Sub(发布/订阅模式)

生产者生产消息,多个消费者可同时消费消息。适用于发送短信、发送邮件等操作

P:生产者,发送消息的程序,但是不在发送到队列中,而是发给X(交换机)。

C:消费者,消息的接收者,会一直等待消息到来。

Queue:消息队列,接收消息,缓存消息。

Exchange:交换机(X),一方面,接收生产者发送的消息,另一方面,知道如何处理消息,例如递给某个特别队列,递给所有队列,或者将消息丢弃,到底如何操作,取决于Exchange的类型

Exchange有常见以下3种类型:

  • Fanout:广播,把消息交给所有绑定到交换机的队列(Pub/Sub模式使用)
  • Direct:定向,把消息交给符合指定routing key 的队列(Routing模式使用)
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topic模式使用)

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

4.1 代码实现

4.1.1 生产者

/***  生产者,生产消息 Hello RabbitMQ*/
public class Producer {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 6.channel调用basicPublish方法发送消息/*** 第一个参数:交换机名字* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/
//        String message = "info: Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, QUEUE_MESSAGE.getBytes());// 7.关闭消息channel.close();connection.close();}}

4.1.2 消费者1

/***  消费者,接收消息 Hello RabbitMQ*/
public class ConsumerOne {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 6.消费者绑定队列// 匿名String queueName = channel.queueDeclare().getQueue();/*** 第一个参数:队列名字* 第二个参数:交换机名称* 第三个参数:路由key*/channel.queueBind(queueName, EXCHANGE_NAME, "");// 7.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(queueName, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费者01接收到的消息是:" + new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者01取消消费的回调这个方法");}});}}

4.1.3 消费者2

/***  消费者02,接收消息 Hello RabbitMQ*/
public class ConsumerTwo {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 6.消费者绑定队列// 匿名String queueName = channel.queueDeclare().getQueue();/*** 第一个参数:队列名字* 第二个参数:交换机名称* 第三个参数:路由key*/channel.queueBind(queueName, EXCHANGE_NAME, "");// 7.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(queueName, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费者02接收到的消息是:" + new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者02取消消费的回调这个方法");}});}}

4.2 实现效果

先启动消费者代码,在启动生产者代码,消费者关闭启动后,队列消失

5、Routing模式(路由模式)

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey

Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routing Key与信息的Routing Key完全一致,才会接收到消息

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key

X:Exchange(交换机),接收生产者的消息,然后把消息递给与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key为error的消息

C2:消费者,其所在队列指定了需要routing key为info、warn、error的消息

Exchange有常见以下3种类型:

  • Fanout:广播,把消息交给所有绑定到交换机的队列(Pub/Sub模式使用)
  • Direct:定向,把消息交给符合指定routing key 的队列(Routing模式使用)
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topic模式使用)

5.1 代码实现

5.1.1 生产者

/***  生产者,生产消息 Hello RabbitMQ*/
public class Producer {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(ROUTING_NAME, "direct");// 6.channel调用basicPublish方法发送消息/*** 第一个参数:交换机名字* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/channel.basicPublish(ROUTING_NAME, "info", null, QUEUE_MESSAGE.getBytes());// 7.关闭消息channel.close();connection.close();}}

5.1.2 消费者1

/***  消费者,接收消息 Hello RabbitMQ*/
public class ConsumerOne {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(ROUTING_NAME, "direct");// 6.消费者绑定队列// 匿名String queueName = channel.queueDeclare().getQueue();/*** 第一个参数:队列名字* 第二个参数:交换机名称* 第三个参数:路由key*/channel.queueBind(queueName, ROUTING_NAME, "info");channel.queueBind(queueName, ROUTING_NAME, "error");channel.queueBind(queueName, ROUTING_NAME, "warn");// 7.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(queueName, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println("消费者01接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者01取消消费的回调这个方法");}});}}

5.1.3 消费者2

/***  消费者,接收消息 Hello RabbitMQ*/
public class ConsumerTwo {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(ROUTING_NAME, "direct");// 6.消费者绑定队列// 匿名String queueName = channel.queueDeclare().getQueue();/*** 第一个参数:队列名字* 第二个参数:交换机名称* 第三个参数:路由key*/channel.queueBind(queueName, ROUTING_NAME, "trace");// 7.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(queueName, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println("消费者02接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者02取消消费的回调这个方法");}});}}

5.2 实现效果

先启动消费者,在启动生产者。

生产者的routing key 是info,消费者1的routing key有info,消费者2的routing key没有info

6、Topic模式

Topic类型与Direct相比,都是可以根据Routing Key把消息路由到不同的队列,只不过Topic类型

Exchange可以让队列在绑定Routing Key的时候使用通配符

RoutingKey一般都是有一个或多个单词组成,多个单词之间以"."分割。例如:item.insert

通配符规则:

  • # 匹配一个或多个词
  • * 匹配不多不少恰好1个词

例如:item.# 能够匹配item.insert.abc或item.insert。item.* 只能匹配item.insert

Exchange有常见以下3种类型:

  • Fanout:广播,把消息交给所有绑定到交换机的队列(Pub/Sub模式使用)
  • Direct:定向,把消息交给符合指定routing key 的队列(Routing模式使用)
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topic模式使用)

6.1 代码实现

6.1.1 生产者

/***  生产者,生产消息 Hello RabbitMQ*/
public class Producer {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelChannel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(TOPIC_NAME, "topic");// 6.channel调用basicPublish方法发送消息/*** 第一个参数:交换机名字* 第二个参数:路由key* 第三个参数:消息属性* 第四个参数:消息内容*/channel.basicPublish(TOPIC_NAME, "employee.save", null, QUEUE_MESSAGE.getBytes());// 7.关闭消息channel.close();connection.close();}}

6.1.2 消费者1

/***  消费者,接收消息 Hello RabbitMQ*/
public class ConsumerOne {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(TOPIC_NAME, "topic");// 6.消费者绑定队列// 匿名String queueName = channel.queueDeclare().getQueue();/*** 第一个参数:队列名字* 第二个参数:交换机名称* 第三个参数:路由key*/channel.queueBind(queueName, TOPIC_NAME, "employee.*");// 7.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(queueName, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println("消费者01 employee接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者01 employee取消消费的回调这个方法");}});}}

6.1.3 消费者2

/***  消费者,接收消息 Hello RabbitMQ*/
public class ConsumerTwo {/*** main方法* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂对象(connectFactory)ConnectionFactory connectFactory = new ConnectionFactory();// 2.设置rabbitmq的Ip地址connectFactory.setHost(HOST);connectFactory.setPort(PORT);connectFactory.setUsername(USER_NAME);connectFactory.setPassword(PASS_WORD);connectFactory.setVirtualHost(VIRTUAL_HOST);// 3.用连接工厂创建连接对象ConnectionConnection connection = connectFactory.newConnection();// 4.用连接对象创建channelfinal Channel channel = connection.createChannel();// 5.设置channel属性/*** 第一个参数:交换机名字* 第二个参数:交换机类型*/channel.exchangeDeclare(TOPIC_NAME, "topic");// 6.消费者绑定队列// 匿名String queueName = channel.queueDeclare().getQueue();/*** 第一个参数:队列名字* 第二个参数:交换机名称* 第三个参数:路由key*/channel.queueBind(queueName, TOPIC_NAME, "user.*");// 7.使用channel去rabbitmq中取消息进行消费/*** 第一个参数:队列名字* 第二个参数:是否自动确认(签收)*/channel.basicConsume(queueName, true, new DeliverCallback() {/*** 当消息从mq中取出来会回调这个函数* 消费者消费消息在handle处理* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param message the delivered message* @throws IOException*/public void handle(String consumerTag, Delivery message) throws IOException {System.out.println("消费者02 user接收到的消息是:" + new String(message.getBody()));}}, new CancelCallback() {/*** 消费者取消消费的回调这个方法* @param consumerTag the <i>consumer tag</i> associated with the consumer*/public void handle(String consumerTag)  {System.out.println("消费者02 user取消消费的回调这个方法");}});}}

6.2 实现效果

先启动消费者,在启动生产者。

生产者的routing key 是employee.save,消费者1的routing key有employee.*,消费者2的routing key没有user.*

官网:RabbitMQ tutorial - Work Queues | RabbitMQ

学习视频:https://www.bilibili.com/video/BV1HM411x7za/?spm_id_from=333.1007.top_right_bar_window_history.content.click&vd_source=94deb0bc72e4dde87140562a29a6c611

http://www.lryc.cn/news/608842.html

相关文章:

  • kafka 是一个怎样的系统?是消息队列(MQ)还是一个分布式流处理平台?
  • Linux常用命令分类总结
  • 井盖识别数据集-2,700张图片 道路巡检 智能城市
  • 本地环境vue与springboot联调
  • ThinkPHP 与 Vue.js 结合的全栈开发模式
  • 十八、Javaweb-day18-前端实战-登录
  • 《前端无障碍设计的深层逻辑与实践路径》
  • 【openlayers框架学习】十一:openlayers实战功能介绍与前端设计
  • K8S几种常见CNI深入比较
  • 企业自动化交互体系的技术架构与实现:从智能回复到自动评论—仙盟创梦IDE
  • ThinkPHP8学习篇(一):安装与配置
  • Go语言--语法基础7--函数定义与调用--自定义函数
  • Mysql深入学习:慢sql执行
  • Docker 国内可用镜像
  • ABP VNext + Quartz.NET vs Hangfire:灵活调度与任务管理
  • [嵌入式embed]C51单片机STC-ISP提示:正在检测目标单片机
  • 深度学习(鱼书)day10--与学习相关的技巧(后两节)
  • LWIP从FreeRTOS到uC/OS-III的适配性改动
  • 第六章第三节 TIM 输出比较
  • 关于Web前端安全防御之安全头配置
  • 位运算在权限授权中的应用及Vue3实践
  • 深入理解Java中String.intern()方法:从原理到并发控制实践
  • ElementUI常用的组件展示
  • 高质量数据集|大模型技术正从根本上改变传统数据工程的工作模式
  • Android 之 串口通信
  • zookeeper分布式锁 -- 读锁和写锁实现方式
  • 【Android】RecyclerView循环视图(2)——动态加载数据
  • 【C 学习】04-了解变量
  • 《volatile 与 synchronized 底层实现与性能比较》
  • 【OD机试题解法笔记】文件缓存系统