当前位置: 首页 > news >正文

【初始RabbitMQ】了解和安装RabbitMQ

RabbitMQ的概念

RabbitMQ是一个消息中间件:他可以接受并转发消息。例如你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据

四大核心概念

生产者:产生数据发送消息的程序

交换机:交换机是RabbitMQ中的一个重要的部件,一方面它接受来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接受到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者是把消息丢弃,这个得有交换机类型决定

队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但是它们只能存储在的队列中。队列仅受主机内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者:消费者大多数是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在一个机器上。同一个程序既可以是生产者也可以是消费者

RabbitMQ核心部分

各种名词介绍

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是非常大的,效率是很低效的。Channel是在connect内部建立的逻辑连接,如果应用成型鼓支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ的安装

官网地址:rabbitmq.com/download.html

百度网盘大家自取:

链接:https://pan.baidu.com/s/1U7rdXf2yk9PRGxOJxhcY8A?pwd=d0jd 
提取码:d0jd

文件上传

以CentOS系统举例:将以上的软件安装使用命令scp

scp D:\Java学习课件\MQ\软件\rabbitmq-server-3.8.8-1.el7.noarch.rpm root@118.31.6.100:/root

安装文件

rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

常用命令(按照以下顺序)

添加开机启动 RabbitMQ 服务

chkconfig rabbitmq-server on

启动服务

/sbin/service rabbitmq-server start

查看服务状态

/sbin/service rabbitmq-server status

 停止服务(选择执行)

/sbin/service rabbitmq-server stop

开启 web 管理插件

rabbitmq-plugins enable rabbitmq_management

用默认账号密码(guest)访问地址 http://真实IP:15672/出现权限问题使用云服务器的一定要把云服务器的防火墙的打开

添加一个新的用户

创建账号

rabbitmqctl add_user admin 123

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

set permissions-p <vhostpath><user><conf> <write><read>rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

当前用户和角色

rabbitmqctl list_users

利用 admin 用户登录

重置命令

关闭应用的命令

rabbitmqctl stop_app

清除的命令

rabbitmqctl reset

重新启动命令

rabbitmqctl start_app 

Hello World

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.rabbitmq</groupId><artifactId>rabbitmq-hello</artifactId><version>1.0-SNAPSHOT</version><!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.6</version> <!-- 使用最新版本 --></dependency></dependencies></project>

生产者代码:

package com.rabbitmq.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者:发消息*/public class Produce {public static final String QUEUE_NAME="hello";public static void main(String[] args) throws IOException, TimeoutException {//创建工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP连接rabbitmqfactory.setHost("118.31.6.132");//用户名factory.setUsername("admin");//密码factory.setPassword("123");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/**生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化(磁盘)默认情况时在内存* 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许* 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数 延迟消息等*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发消息String message = "hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的KEY值是哪个 本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕!");}
}

消费者代码:

package com.rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeoutException;/*** 消费者*/
public class Consume {//队列名称public static final String QUEUE_NAME = "hello";//接受消息public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("118.31.6.132");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明 接受消息DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);}
}

运行结果:

http://www.lryc.cn/news/300867.html

相关文章:

  • Linux第56步_根文件系统第3步_将busybox构建的根文件系统烧录到EMMC
  • Linux进程间通信(三)-----System V消息队列
  • Elasticsearch:混合搜索是 GenAI 应用的未来
  • 态、势、感、知的偏序、全序与无序
  • 【从Python基础到深度学习】 8. VIM两种状态
  • java微服务面试篇
  • OpenAI 生成视频模型 Sora 论文翻译
  • 2.13日学习打卡----初学RocketMQ(四)
  • ZigBee学习——BDB
  • 使用Docker快速部署MySQL
  • 力扣热题100_滑动窗口_3_无重复字符的最长子串
  • RM电控工程讲义
  • 论文阅读:《Deep Learning-Based Human Pose Estimation: A Survey》——Part 1:2D HPE
  • C语言——oj刷题——杨氏矩阵
  • C++ 50道面试题
  • 寒假学习记录14:JS字符串
  • 【数学建模】【2024年】【第40届】【MCM/ICM】【C题 网球运动中的“动量”】【解题思路】
  • 无人驾驶LQR控制算法 c++ 实现
  • Karnaugh map (卡诺图)
  • C# CAD 框选pdf输出
  • 【Linux】 Linux 小项目—— 进度条
  • Sora和Pika,RunwayMl,Stable Video对比!网友:Sora真王者,其他都是弟
  • Go内存优化与垃圾收集
  • 【Spring】Bean 的生命周期
  • 云计算基础-存储基础
  • 问题:人的安全知识和技能是天生的。() #媒体#知识分享#学习方法
  • 【数据分享】2001~2020年青藏高原植被净初级生产力数据集
  • 【Spring MVC篇】返回响应
  • 阿里云BGP多线精品EIP香港CN2线路低时延,价格贵
  • (08)Hive——Join连接、谓词下推