【安装前环境准备】
检查是否安装好JDK(必要):java -version
查看CPU信息:
# cat /proc/cpuinfo
# lscpu
# getconf _NPROCESSORS_ONLN
# cat /sys/devices/system/cpu/online
# cat /proc/interrupts | egrep -i 'cpu查看内存信息:
# free -hm
# cat /proc/meminfo查看磁盘信息:
# lsblk
# fdisk -l - 显示系统中的磁盘分区表信息,包括硬盘的大小、分区类型等
# df -hl
# df -a
# du -sh [目录名]
# du -sm [文件夹]`:查看指定文件夹的总M数
# du -h [目录名]`:查看指定文件夹下的所有文件大小(包含子文件夹)
# ls -lh /opt/install-file/RocketMQ 防火墙端口开放
NameServer端口:9876
Broker端口:10911
Broker高可用(HA)端口:10912
Broker管理端口:10909(通常是10911 - 2,默认不开启)
FastRemoting端口:通常为 Broker 监听端口 + 2(例如 10911 + 2 = 10913)状态:sudo systemctl status firewalld
启动:sudo systemctl start firewalld
查看:firewall-cmd --list-ports --permanent添加防火墙开放访问端口:
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=10912/tcp --permanent加载:firewall-cmd --reload
重启:systemctl restart firewalld
停止:sudo systemctl stop firewalld安装解压工具:yum install -y unzip
创建安装目录:mkdir -p /opt/soft/rocketmq【正式安装 RocketMQ】1.下载:https://dist.apache.org/repos/dist/release/rocketmq/4.9.8/rocketmq-all-4.9.8-bin-release.zip2.解压到指定安装目录:unzip rocketmq-all-4.9.8-bin-release.zip -d /opt/soft/rocketmq/3.重命名文件夹# cd /opt/soft/rocketmq/
# mv rocketmq-all-4.9.8-bin-release rocketmq-all-4.9.8目录结构说明:benchmark :存放的是性能测试脚本bin:可执行文件脚本文件
conf: 存放配置文件的目录lib:其他第三方依赖库LICENSE:授权信息NOTICE:版本公告信息README.md4.配置环境变量执行路径:/opt/soft/rocketmq/rocketmq-all-4.9.8/bin
查看网卡eth0的IP地址:ifconfig# vim /etc/profile
添加 NAMESRV_ADDR 环境变量配置:
# rocketmq config
export NAMESRV_ADDR=192.168.1.210:9876保存生效:source /etc/profile5.修改启动脚本需要修改两个启动脚本:runserver.sh 与 runbroker.sh
脚本位置:/opt/soft/rocketmq/rocketmq-all-4.9.8/bin/
先备份好原来的配置,然后开始修改a.修改runserver.sh脚本: vim runserver.sh找到 choose_gc_options() 函数,根据你的jdk版本修改启动配置参数:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"根据你的机器配置修改java参数,比如:-Xms4g -Xmx4g -Xmn2g 改为 -Xms512m -Xmx512m -Xmn256mb.修改runbroker.sh脚本: vim runbroker.sh# 同样找到 choose_gc_log_directory ,修改函数中的java虚拟机参数配置如:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g" 改为JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"6.修改配置文件# cd /opt/soft/rocketmq/rocketmq-all-4.9.8/conf/
# ll
# vim broker.confbrokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH在文件文本后面添加如下配置:namesrvAddr=192.168.1.210:9876 -- IP地址根据自己的通信地址设置
autoCreateTopicEnable=true
brokerIP1=192.168.1.210其他参数说明:brokerClusterName - MQ集群的名称,我们改为RocketMQ-Cluster。
brokerName - 队列的名字,配置为broker-a。
brokerId - 队列的id,0代表是“主”,其他正整数代表着“从”。
deleteWhen=04 - 代表着commitLog过期了,就会被删除。
fileReservedTime - commitLog的过期时间,单位是小时,这里配置的是48小时。
brokerRole - 队列的角色,ASYNC_MASTER是异步主。
flushDiskType - 保存磁盘的方式,异步保存。配置参数说明:namesrvAddr:nameSrv地址,如果nameSrv与broker在同一台服务器上运行,可以配置为localhost+端口。集群中最好配置为对外提供服务的IP地址+端口
autoCreateTopicEnable:true说明自动创建主题,false则需要手动创建
brokeIP1:这个一定要配置为对外提供服务的IP地址7.启动服务如果机器配置不够,启动前先清一下缓存
清除PageCache页面高速缓存:sudo sync && echo 3 | sudo tee /proc/sys/vm/drop_caches
清除dentries和inodes,即目录项和索引节点:sudo sync && echo 2 > /proc/sys/vm/drop_cachesa.先启动namesrv# cd /opt/soft/rocketmq/rocketmq-all-4.9.8/bin/
后台启动:nohup sh mqnamesrv &
后台带日志启动: nohup sh mqnamesrv > ../namesrv.log 2>&1 &b.然后启动broker:后台启动:nohup sh mqbroker -c ../conf/broker.conf &
后台指定日志启动:nohup sh mqbroker -c ../conf/broker.conf > ../broker.log 2>&1 &8.安装可视化管理控制台参考官方文档下载 rocketmq-dashboard-1.0.0-source-release.zip 源码包解压,按照文档说明进行配置和打包
官方打包指导:https://github.com/apache/rocketmq-dashboard
最终得到可运行的jar包:rocketmq-dashboard-1.0.0.jara.配置rocketmq-dashboard-1.0.0\src\main\resources\application.properties 等
b.使用mvn clean package -Dmaven.test.skip=true 打包获取可运行的jar包:rocketmq-dashboard-1.0.0.jar
c.mkdir -p /opt/soft/rocketmq/rocketmq-dashboard
d.将打包好的jar包上传到新建的 rocketmq-dashboard 目录下
e.进入rocketmq-dashboard目录,启动运行控制台:启动运行控制台:
# nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8090 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > dashboard.log 2>&1 &访问控制台:ip:80909.查看服务启动状态查看进程:
# jps -l
8753 rocketmq-dashboard-1.0.0.jar
407 org.apache.rocketmq.broker.BrokerStartup
32377 org.apache.rocketmq.namesrv.NamesrvStartup
8781 sun.tools.jps.Jps查看端口:
# netstat -npl|grep :8090
tcp6 0 0 :::8090 :::* LISTEN 8753/java
# netstat -npl|grep :9876
tcp6 0 0 :::9876 :::* LISTEN 32377/java
# netstat -npl|grep :10911
tcp6 0 0 :::10911 :::* LISTEN 407/java
# netstat -npl|grep :10912
tcp6 0 0 :::10912 :::* LISTEN 407/java10.测试引入客户端依赖:rocketmq-client<!-- rocketmq-client: 实际生产开发使用 rocketmq-spring-boot-starter 等具体依赖 -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.8</version>
</dependency>编写生产者测试代码:import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class TestSender {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {// 1.创建消息生产者producer,并指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");// 2.指定 nameserver 地址producer.setNamesrvAddr("118.195.219.5:9876");// 设置发送超时时间producer.setSendMsgTimeout(10000);// 3.启动生产者 - producerproducer.start();// 4.构建消息对象Message message = new Message();message.setTopic("myTopic");message.setTags("myTag");message.setBody(("Test MQ from sync-main, 今年666!").getBytes());// 5.发送消息SendResult result = producer.send(message, 10000);String msgId = result.getMsgId();int queueId = result.getMessageQueue().getQueueId();String offsetMegId = result.getOffsetMsgId();long offset = result.getQueueOffset();SendStatus sendStatus = result.getSendStatus();String sendMsg = "同步消息发送状态:"+sendStatus+"\t"+"消息id:"+msgId+"\t 消费者队列id:"+queueId +"\t offsetMegId:"+offsetMegId+"\t offset:"+offset;System.out.println("发送的消息:" + sendMsg);// 6.关闭生产者producer.shutdown();}
}编写消费者测试代码:import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class TestConsumer {private static final Logger log = LoggerFactory.getLogger(TestConsumer.class);public static void main(String[] args) throws MQClientException {// 1.创建消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");// 2.设置NameServerconsumer.setNamesrvAddr("118.195.219.5:9876");// 3.指定订阅的主题和标签consumer.subscribe("myTopic","*");// 4.注册监听器与编写回调函数consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {log.info("Received Message size =>{}", msgList.size());for (MessageExt msg : msgList) {System.out.println("消费端接收到消息主题为: " + msg.getTopic() + "的消息, 队列ID为:" + msg.getQueueId());System.out.println("消费端接收到消息内容为 " + new String(msg.getBody()));}// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动消费者consumer.start();System.out.printf("MQ Consumer Started.%n");}}运行测试,如果没问题,说明 RocketMQ 安装成功!