当前位置: 首页 > news >正文

通过多线程的方式每次发送10条MQ消息

背景:传入一个List<person>,不知道list中有多少条数据。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.Message;
import org.apache.rocketmq.client.producer.SendResult;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MultiThreadMessageSender {public static void main(String[] args) {// 设置生产者组名DefaultMQProducer producer = new DefaultMQProducer("your_producer_group");// 设置NameServer地址,多个地址用分号分隔producer.setNamesrvAddr("your_namesrv_address");// 启动生产者实例try {producer.start();// 创建消息集合List<Person> personList = // 从某处获取您的 Person 数据// 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(5); // 这里使用固定大小为5的线程池,您可以根据需要进行调整// 每10条数据为一批,提交到线程池处理for (int i = 0; i < personList.size(); i += 10) {List<Person> subList = personList.subList(i, Math.min(i + 10, personList.size()));// 提交任务到线程池executorService.submit(() -> sendMessages(subList, producer));}// 关闭线程池executorService.shutdown();} catch (Exception e) {e.printStackTrace();} finally {// 关闭生产者实例producer.shutdown();}}private static void sendMessages(List<Person> subList, DefaultMQProducer producer) {try {// 创建消息集合List<Message> messages = new ArrayList<>();// 构造消息for (Person person : subList) {// 将 Person 对象转换为字符串,作为消息内容String messageContent = person.getName() + "," + person.getAge();Message message = new Message("your_topic", "your_tag", messageContent.getBytes());messages.add(message);}// 发送消息SendResult sendResult = producer.send(messages);System.out.println("Thread " + Thread.currentThread().getId() + " Send Result: " + sendResult);} catch (Exception e) {e.printStackTrace();}}
}

在上述示例中,我们使用了Java的ExecutorService线程池来管理线程。每个线程负责处理10条Person对象,将它们转换为RocketMQ消息并发送。这样,多个线程可以并行处理不同的批次,提高了消息发送的效率。

http://www.lryc.cn/news/236721.html

相关文章:

  • springboot上传文件后显示权限不足
  • spring-boot-maven-plugin插件 —— 打包时减小jar包的大小方法
  • java Bigdecimal
  • 【C++11并发】thread 笔记
  • OBS Studio免费开源录屏工具
  • 【汇编】[bx+idata]的寻址方式、SI和DI寄存器
  • Java,集合框架,关于Map接口与Collections工具类
  • 【实用技巧】更改ArduinoIDE默认库文件位置,解放系统盘,将Arduino15中的库文件移动到其他磁盘
  • 二、什么是寄存器
  • 邀请报名|11月24日阿里云原生 Serverless 技术实践营 深圳站
  • 学习UI第一天
  • 1688商品详情原数据(2023年11月最新版)
  • CleanMyMac X2024免费测试版好不好用?值不值得下载
  • Linux操作文件的底层系统调用
  • steam搬砖项目2023年现状分析,到底还能不能做?
  • 【论文阅读】基于隐蔽带宽的汽车控制网络鲁棒认证(二)
  • string类的常用方法
  • Java面向对象(高级)-- 单例(Singleton)设计模式
  • 【Kingbase FlySync】命令模式:安装部署同步软件,实现Oracle到KES实现同步
  • 2311d导入c的语义不同
  • OpenHarmony Meetup北京站招募令
  • C语言——冒泡排序
  • 08.智慧商城——购物车布局、全选反选、功能实现
  • 金属压块液压打包机比例阀放大器
  • python 自动化福音,30行代码手撸ddt模块
  • 基于GATK流程化进行SNP calling
  • 【Java SE】如何解读Java的继承和多态的特性?
  • uniapp 手动调用form表单submit事件
  • 11月20日星期一今日早报简报微语报早读
  • Unity中 Start和Awake的区别