当前位置: 首页 > news >正文

RocketMQ mqadmin java springboot python 调用笔记

命令

mqadmin命令列表

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin
The most commonly used mqadmin commands are:updateTopic               Update or create topicdeleteTopic               Delete topic from broker and NameServer.updateSubGroup            Update or create subscription groupsetConsumeMode            Set consume message mode. pull/pop etc.deleteSubGroup            Delete subscription group from broker.updateBrokerConfig        Update broker's configupdateTopicPerm           Update topic permtopicRoute                Examine topic route infotopicStatus               Examine topic Status infotopicClusterList          Get cluster info for topicaddBroker                 Add a broker to specified containerremoveBroker              Remove a broker from specified containerresetMasterFlushOffset    Reset master flush offset in slavebrokerStatus              Fetch broker runtime status dataqueryMsgById              Query Message by IdqueryMsgByKey             Query Message by KeyqueryMsgByUniqueKey       Query Message by Unique keyqueryMsgByOffset          Query Message by offsetqueryMsgTraceById         Query a message traceprintMsg                  Print Message DetailprintMsgByQueue           Print Message DetailsendMsgStatus             Send msg to broker.brokerConsumeStats        Fetch broker consume stats dataproducerConnection        Query producer's socket connection and client versionconsumerConnection        Query consumer's socket connection, client version and subscriptionconsumerProgress          Query consumers's progress, speedconsumerStatus            Query consumer's internal data structurecloneGroupOffset          Clone offset from other group.producer                  Query producer's instances, connection, status, etc.clusterList               List cluster infostopicList                 Fetch all topic list from name serverupdateKvConfig            Create or update KV config.deleteKvConfig            Delete KV config.wipeWritePerm             Wipe write perm of broker in all name server you defined in the -n paramaddWritePerm              Add write perm of broker in all name server you defined in the -n paramresetOffsetByTime         Reset consumer offset by timestamp(without client restart).skipAccumulatedMessage    Skip all messages that are accumulated (not consumed) currentlyupdateOrderConf           Create or update or delete order confcleanExpiredCQ            Clean expired ConsumeQueue on broker.deleteExpiredCommitLog    Delete expired CommitLog filescleanUnusedTopic          Clean unused topic on broker.startMonitoring           Start MonitoringstatsAll                  Topic and Consumer tps statsallocateMQ                Allocate MQcheckMsgSendRT            Check message send response timeclusterRT                 List All clusters Message Send RTgetNamesrvConfig          Get configs of name server.updateNamesrvConfig       Update configs of name server.getBrokerConfig           Get broker config by cluster or special brokergetConsumerConfig         Get consumer config by subscription group namequeryCq                   Query cq command.sendMessage               Send a messageconsumeMessage            Consume messageupdateAclConfig           Update acl config yaml file in brokerdeleteAclConfig           Delete Acl Config Account in brokerclusterAclConfigVersion   List all of acl config version information in clusterupdateGlobalWhiteAddr     Update global white address for acl Config File in brokergetAclConfig              List all of acl config information in clusterupdateStaticTopic         Update or create static topic, which has fixed number of queuesremappingStaticTopic      Update or create static topic, which has fixed number of queuesexportMetadata            Export metadataexportConfigs             Export configsexportMetrics             Export metricshaStatus                  Fetch ha runtime status datagetSyncStateSet           Fetch syncStateSet for target brokersgetBrokerEpoch            Fetch broker epoch entriesgetControllerMetaData     Get controller cluster's metadatagetControllerConfig       Get controller config.updateControllerConfig    Update controller config.electMaster               Re-elect the specified broker as mastercleanBrokerMetadata       Clean metadata of broker on controllerdumpCompactionLog         parse compaction log to messagegetColdDataFlowCtrInfo    get cold data flow ctr infoupdateColdDataFlowCtrGroupConfig addOrUpdate cold data flow ctr group configremoveColdDataFlowCtrGroupConfig remove consumer from cold ctr configsetCommitLogReadAheadMode set read ahead mode for all commitlog files

topicList

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicList  -n localhost:9876
%RETRY%please_rename_unique_group_name
RMQ_SYS_TRANS_HALF_TOPIC
stringRequestTopic
%RETRY%objectRequestConsumer
%RETRY%please_rename_unique_group_name_4
TRANS_CHECK_MAX_TIME_TOPIC
BenchmarkTest
%RETRY%genericRequestConsumer
string-topic
TBW102
rmq_sys_REVIVE_LOG_DefaultCluster
SELF_TEST_TOPIC
%RETRY%string_consumer_newns
SCHEDULE_TOPIC_XXXX
DefaultCluster_REPLY_TOPIC
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23
RMQ_SYS_TRANS_OP_HALF_TOPIC
TopicTest
localhost.localdomain
order-paid-topic
%RETRY%my-group1
user-topic
%RETRY%string_trans_consumer
message-ext-topic
OFFSET_MOVED_EVENT
%RETRY%user_consumer
%RETRY%order-paid-consumer
yeqiang-MS-7B23
DefaultCluster
spring-transaction-topic
%RETRY%stringRequestConsumer
bytesRequestTopic
%RETRY%string_consumer
%RETRY%bytesRequestConsumer
%RETRY%rocketmq-consume-demo-message-ext-consumer

statsAll

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin statsAll  -n localhost:9876 
#Topic                                                            #Consumer Group                                                  #Accumulation      #InTPS     #OutTPS   #InMsg24Hour  #OutMsg24Hour
RMQ_SYS_TRANS_HALF_TOPIC                                          CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
stringRequestTopic                                                stringRequestConsumer                                                       1        0.00        0.00              0              0
TRANS_CHECK_MAX_TIME_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
BenchmarkTest                                                                                                                                 0        0.00                          0    NO_CONSUMER
string-topic                                                      string_consumer                                                           106        0.00        0.00              0              0
string-topic                                                      string_consumer_newns                                                      63        0.00        0.00              0              0
TBW102                                                                                                                                        0        0.00                          0    NO_CONSUMER
rmq_sys_REVIVE_LOG_DefaultCluster                                                                                                             0        0.00                          0    NO_CONSUMER
SELF_TEST_TOPIC                                                                                                                               0        0.00                          0    NO_CONSUMER
SCHEDULE_TOPIC_XXXX                                                                                                                           0        0.00                          0    NO_CONSUMER
DefaultCluster_REPLY_TOPIC                                                                                                                    0        0.00                          0    NO_CONSUMER
rmq_sys_SYNC_BROKER_MEMBER_yeqiang-MS-7B23                                                                                                    0        0.00                          0    NO_CONSUMER
RMQ_SYS_TRANS_OP_HALF_TOPIC                                       CID_RMQ_SYS_TRANS                                                           0        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name                                           252        0.00        0.00              0              0
TopicTest                                                         please_rename_unique_group_name_4                                           0        0.00        0.00              0              0
localhost.localdomain                                                                                                                         0        0.00                          0    NO_CONSUMER
order-paid-topic                                                  order-paid-consumer                                                         1        0.00        0.00              0              0
user-topic                                                        user_consumer                                                               2        0.00        0.00              0              0
message-ext-topic                                                 rocketmq-consume-demo-message-ext-consumer                                  2        0.00        0.00              0              0
OFFSET_MOVED_EVENT                                                                                                                            0        0.00                          0    NO_CONSUMER
yeqiang-MS-7B23                                                                                                                               0        0.00                          0    NO_CONSUMER
DefaultCluster                                                                                                                                0        0.00                          0    NO_CONSUMER
spring-transaction-topic                                          string_trans_consumer                                                      15        0.00        0.00              0              0
bytesRequestTopic                                                 bytesRequestConsumer                                                        0        0.00        0.00              0              0

topicStatus

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     35                      2023-08-25 16:21:35,786
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

Python 生产者:producer.py

from rocketmq.client import Producer, MessagegroupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
TAGS = "tag-my-group1"
KEYS = "key-my-group1-0"
# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# producer.set_session_credentials(
#     accessKey,  # 角色密钥
#     secretKey,  # 角色名称
#     ''
# )
# 启动生产者
producer.start()# 组装消息   topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()

运行

yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ source /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/activate
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012857767267388CFD61230000 35
(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ 

mqadmin查询topic状态

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicStatus  -n localhost:9876 -t string-topic
#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated
yeqiang-MS-7B23                   0     0                     36                      2023-08-28 09:03:35,722
yeqiang-MS-7B23                   1     0                     52                      2023-08-25 14:55:57,152
yeqiang-MS-7B23                   2     0                     33                      2023-08-25 16:21:35,646
yeqiang-MS-7B23                   3     0                     42                      2023-08-25 14:55:57,172
yeqiang-MS-7B23                   4     0                     1                       2023-08-25 16:21:34,355
yeqiang-MS-7B23                   5     0                     1                       2023-08-25 14:55:57,105
yeqiang-MS-7B23                   6     0                     4                       2023-08-25 16:23:01,489
yeqiang-MS-7B23                   7     0                     1                       2023-08-25 16:21:36,186

yeqiang@yeqiang-MS-7B23:/opt/rocketmq-all-5.1.3-bin-release$ sh bin/mqadmin topicRoute  -n localhost:9876 -t string-topic
{"brokerDatas":[{"brokerAddrs":{0:"10.47.76.67:10911"},"brokerName":"yeqiang-MS-7B23","cluster":"DefaultCluster","enableActingMaster":false}],"filterServerTable":{},"queueDatas":[{"brokerName":"yeqiang-MS-7B23","perm":6,"readQueueNums":8,"topicSysFlag":0,"writeQueueNums":8}]
}

图形工具rocketmq-dashborad

https://github.com/apache/rocketmq-dashboard

自行编译

mvn clean package -Dmaven.test.skip=true

启动

java -Drocketmq.namesrv.addr=127.0.0.1:9876 -jar target/rocketmq-dashboard-1.0.0.jar

 

 

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/producer.py
SendStatus.OK 7F0001012DF4226307248D16C3250000 36

 

 consoumer.py

import time
from rocketmq.client import PushConsumer, ConsumeStatus
# 消息处理回调groupName = "my-group1"
nameserver = "127.0.0.0:9876"
topicName = "string-topic"
KEYS = "key-my-group1-0"def callback(msg):# 模拟业务print('Received message. messageId: ', msg.id, ' body: ', msg.body)# 消费成功回复CONSUME_SUCCESSreturn ConsumeStatus.CONSUME_SUCCESS# 消费成功回复消息状态# return ConsumeStatus.RECONSUME_LATER# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
# consumer.set_session_credentials(
#     accessKey,	 # 角色密钥
#     secretKey,   # 角色名称
#     ''
# )
# 订阅topic
consumer.subscribe(topicName, callback, "*")
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()while True:time.sleep(3600)
# 资源释放
consumer.shutdown()

启动python消费者

(venv) yeqiang@yeqiang-MS-7B23:~/code/pythonproj/python-rocketmq-demo$ /home/yeqiang/code/pythonproj/python-rocketmq-demo/venv/bin/python /home/yeqiang/code/pythonproj/python-rocketmq-demo/consumer.py[Consumer] Waiting for messages.
Received message. messageId:  7F0001012DF4226307248D16C3250000  body:  b'This is a new message.'

可以看到my-group1已被消费 

再启动一个consumer.py,产生一次消息

可以看到,只有一个consumer消费到了消息,说明默认情况下,消息非广播模式。

Java生产一个消息:

training: Java SpringBoot SpringCloud k8s等练习程序 - Gitee.com

 

 

 

python rocketmq依赖

Release rocketmq-client-cpp-2.1.0 · apache/rocketmq-client-cpp · GitHub

python完整程序

python-rocketmq-demo: python3 rocketmq5 的一个例子

http://www.lryc.cn/news/143979.html

相关文章:

  • Java aspose 将HTML导出成Excel文件
  • 原生微信小程序 动态(横向,纵向)公告(广告)栏
  • pandas和polars简单的对比分析
  • Feign远程调用的使用
  • Postman API测试之道:不止于点击,更在于策略
  • 5G 数字乡村数字农业农村大数据中心项目农业大数据建设方案PPT
  • Golang Gorm 一对多的添加
  • 图像扭曲之锯齿
  • 【分布式技术专题】「OSS中间件系列」Minio的文件服务的存储模型及整合Java客户端访问的实战指南
  • 构建个人博客_Obsidian_github.io_hexo
  • 烟花厂人员作业释放静电行为检测算法
  • ARTS挑战第二周-T:PHP数组相关操作
  • 【如何对公司网络进行限速?一个案例详解】
  • 服务器安全-修改默认ssh端口
  • 保护隐私的第一步:从更新浏览器开始
  • Python爬虫框架之快速抓取互联网数据详解
  • 【算法专题突破】双指针 - 盛最多水的容器(4)
  • 循环神经网络(RNN) | 项目还不成熟 |还在初级阶段
  • 【Spring Boot】数据库持久层框架MyBatis — MyBatis简介
  • K8S Nginx Ingress实现金丝雀发布
  • 【C++入门】new和delete(C/C++内存管理)
  • C++设计模式之桥接模式
  • 前端速查速记系列----评论列表
  • hiredis的安装与使用
  • 【InsCode】InsCode打造的JavaSE与Linux命令互融的伪Linux文件系统小项目
  • “深入解析JVM:探索Java虚拟机的内部机制“
  • 内网远程控制总结
  • Excel显示此值与此单元格定义的数据验证限制不匹配怎么办?
  • mysql(八)事务隔离级别及加锁流程详解
  • 华为云Stack的学习(二)