rabbitmq安装、基本使用
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management
docker会自己下载,然后运行
进入docker:
docker exec -it rabbitmq bash
进入容器,重启rabbitmq:rabbitmq-server restart
感觉所有的消息队列都差不多,都是创建,连接,发消息,获得消息
package com.quxiao;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @program: springboot* @author: quxiao* @create: 2023-10-29 09:39**/ public class t1 {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("123");factory.setPassword("123");factory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//连接connection = factory.newConnection("生产者1");//通道channel = connection.createChannel();channel.queueDeclare("duilie1", false, false, false, null);channel.basicPublish("", "duilie1", null, "队列消息".getBytes());} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}} }
消费:
package com.quxiao;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @program: springboot* @author: quxiao* @create: 2023-10-29 10:11**/ public class t2 {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("123");factory.setPassword("123");factory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//连接connection = factory.newConnection("生产者1");//通道channel = connection.createChannel(); // channel.queueDeclare("duilie1", false, false, false, null); // channel.basicPublish("", "duilie1", null, "队列消息".getBytes());channel.basicConsume("duilie1", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(new String(message.getBody(), "UTF-8"));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("接收消息失败");}});} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}} }
路由分组模式:
定义路由key,将队列绑定,发送到路由key,就会发到被绑定的所有队列。
package com.quxiao;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @program: springboot* @author: quxiao* @create: 2023-10-29 09:39**/ public class t1 {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127");factory.setPort(5672);factory.setUsername("123");factory.setPassword("123");factory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//连接connection = factory.newConnection("生产者1");//通道channel = connection.createChannel(); // channel.queueDeclare("", false, false, false, null);channel.basicPublish("amq.direct", "type2", null, "队列消息".getBytes()); //21} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);} finally {channel.close();connection.close();}} }