当前位置: 首页 > news >正文

rocketmq 集群环境部署及与spring cloud 集成

1 下载zip 安装包

rocketmq-all-5.1.4-bin-release.zip

2 修改启动配置,防止默认内存配置过高

runserver.sh/runbroker.sh/tools.sh

3 启动namesrv

nohup sh bin/mqnamesrv >>namesrv.log &

4 启动broker+proxy

单点模式:

nohup sh bin/mqbroker -c conf/brock.conf -pc conf/rmq-proxy.json --enable-proxy >>broker.log&

集群模式:2m+2s-async 多主多从,异步复制

A1机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a.properties -pc conf/rmq-proxy-a.json --enable-proxy >>broker-a.log&

B2机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b.properties -pc conf/rmq-proxy-b.json --enable-proxy >>broker-b.log&

B3机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a-s.properties -pc conf/rmq-proxy-a-s.json --enable-proxy >>broker-a-s.log&

A4机器>nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b-s.properties -pc conf/rmq-proxy-b-s.json --enable-proxy >>broker-b-s.log&

5 配置控制面板 rocketmq-dashboard

nohup java -jar -Xms512m -Xmx1G rocketmq-dashboard-1.0.0.jar >> rocketmq-dashboard.log &

注意事项:

  • 所有ip配置均配置本机外网ip
  • 端口不重复,安全组开启相应端口
  • -n 多主机情况,需要增加引号或双引号
  • broker 监听端口,不重复
  • proxy 监听端口,不重复

6 集成到spring cloud 具体服务中:

0 通过rocketmq-dashboard 增加topic

1 增加配置

#NameServer地址

rocketmq.name-server: 192.168.1.1:9876

rocketmq.producer.group: sale-producer-group

2 增加依赖

implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1'

3 消息生产者

import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;import javax.annotation.Resource;@RestController
@AllArgsConstructor
public class SaleMQProducer {private final Logger LOGGER = LoggerFactory.getLogger(SaleMQProducer.class);@Resourceprivate RocketMQTemplate rocketMQTemplate;@RequestMapping(value = "/api/mq/create", method = RequestMethod.POST)Object create(@RequestBody JSONObject req) {return Mono.defer(() -> {String topic = req.getString("topic");String tag = req.getString("tag");String body = req.getString("body");LOGGER.info("topic={},tag={},body={}", topic, tag, body);//支持带tag发送String dest = String.format("%s:%s", topic, tag);rocketMQTemplate.convertAndSend(dest, body);return Mono.just(true);});}
}

4 消息消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "sale-consumer-group", topic = "TEST_FIRST")
public class SaleMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : " + message);}
}

5 消息消费者,增加tag匹配(默认* 全量匹配)

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(consumerGroup = "sale-consumer-group-tag-a", topic = "TEST_FIRST", selectorExpression = "TagA")
public class SaleMQConsumerTagA implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : " + message);}
}

http://www.lryc.cn/news/248712.html

相关文章:

  • SpringBoot——配置及原理
  • fiddler抓包安卓
  • Maven 进阶学习指南---setting详解
  • 人工智能与我们的生活
  • 前端将blob转换为可下载的url及下载
  • LVS-DR实验
  • MYSQL索引使用注意事项
  • 深入理解Java中的String、StringBuilder和StringBuffer(每天一个技术点,第一天)
  • PHP逻辑运算符学习资料
  • 深入解析CPU工作原理与细节
  • 计算机网络(超详解!) 第二节 物理层(上)
  • c++ 打怪升级
  • 代码随想录第十三天(一刷C语言)|翻转二叉树对称二叉树
  • Temu已成拼多多第二曲线
  • vue+el-tooltip 封装提示框组件,只有溢出才提示
  • GAN:PacGAN-生成对抗网络中两个样本的威力
  • 【面试】typescript
  • 初识向量数据库
  • Zabbix“专家坐诊”第213期问答汇总
  • Linux RN6752 驱动编写
  • 扩展ACL命令
  • 多媒体信号处理复习笔记 --脑图版本
  • 力扣二叉树--第三十五天
  • 先喝点水,这期程序员兼职干货没有水分!
  • vue3通过el-dropdown实现动态菜单切换页面
  • go学习之文件操作与命令行参数
  • 面试题:海量PDF的OCR处理思路
  • [原创][2]探究C#多线程开发细节-“线程的无顺序性“
  • 【精选】Spring整合MyBatis,Junit 及Spring 事务Spring AOP面向切面详解
  • 获取Spring容器Bean工具类