
生产者
package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();// 在mq中声明一个交换机/*** 第一个参数:交换机的名字* 第二个参数:交换机的类型,fanout代表该交换机会把收到的消息无差别投递给所有他关联的队列*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String msg = "hello fanout!";/*** 第一个参数,交换机的名字* 第二个参数:如果交换机是 fanout类型的,可以写空串 ;因为fanout类型的交换机会把消息无差别向关联队列投递*/channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("utf-8"));channel.close();conn.close();}
}
消费者1
package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字String queueName = channel.queueDeclare().getQueue();// 把队列和交换机建立好绑定关系/*** 参数1: 队列名* 参数2: 交换机名* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串*/channel.queueBind(queueName,EXCHANGE_NAME,"");channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println("01:"+msg);// 手动ackchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},c -> {});}
}
消费者2
package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字String queueName = channel.queueDeclare().getQueue();// 把队列和交换机建立好绑定关系/*** 参数1: 队列名* 参数2: 交换机名* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串*/channel.queueBind(queueName,EXCHANGE_NAME,"");channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println("02:"+msg);// 手动ackchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},c -> {});}
}
消费者3
package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveLogs03 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字String queueName = channel.queueDeclare().getQueue();// 把队列和交换机建立好绑定关系/*** 参数1: 队列名* 参数2: 交换机名* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串*/channel.queueBind(queueName,EXCHANGE_NAME,"");channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println("03:"+msg);// 手动ackchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},c -> {});}
}