当前位置: 首页 > news >正文

并发框架disruptor实现生产-消费者模式

 `Disruptor`是LMAX公司开源的高性能内存消息队列,单线程处理能力可达600w订单/秒。本文将使用该框架实现生产-消费者模式。

一、框架的maven依赖

   <!-- https://mvnrepository.com/artifact/com.lmax/disruptor --><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>

二、消息事件

package com.monika.main.system.mq;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;import java.util.EventObject;/*** @author:whh* @date: 2024-12-04 20:27* <p></p>*/
public class MsgEvent  {private String data;public String getData() {return data;}public void setData(String data) {this.data = data;}
}

三、消息事件处理器

package com.monika.main.system.mq;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;/*** @author:whh* @date: 2024-12-04 22:28* <p>*     *     * </p>*/
public class MsgEventHandler implements EventHandler<MsgEvent>, WorkHandler<MsgEvent> {private String name;public MsgEventHandler(String name) {this.name = name;}@Overridepublic void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {System.out.println(name+"-----start-----"+sequence);Thread.sleep(1000*10);System.out.println("ThreadName:  "+Thread.currentThread().getName());System.out.println(event.getData()+" end seq: "+sequence);}@Overridepublic void onEvent(MsgEvent event) throws Exception {System.out.println(name+"-----start-----");Thread.sleep(1000*10);System.out.println("ThreadName:  "+Thread.currentThread().getName());System.out.println(event.getData());System.out.println(name+"-----end-----");}
}

该消息处理器实现了两个接口,EventHandler接口,该接口实现统一消费一个消息会被所有消费者消费;WorkHandler接口,该接口实现分组消费一个消息只能被一个消费者消费,多消费者轮询处理。

四、Disruptor配置

package com.monika.main.system.mq;import cn.hutool.core.thread.NamedThreadFactory;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author:whh* @date: 2024-12-04 20:33* <p></p>*/@Configuration
public class RingBufferConfig {@Beanpublic RingBuffer<MsgEvent> ringBuffer(){NamedThreadFactory threadFactory = new NamedThreadFactory("MsgEvent-",true);EventFactory<MsgEvent> eventFactory = new EventFactory<MsgEvent>() {@Overridepublic MsgEvent newInstance() {return new MsgEvent();}};Disruptor<MsgEvent> disruptor = new Disruptor(eventFactory,1024, threadFactory);//定义两个消费者MsgEventHandler m1 = new MsgEventHandler("m1");MsgEventHandler m2 = new MsgEventHandler("m2");//disruptor.handleEventsWith(m1,m2); //统一消费:一个消息会被所有消费者消费disruptor.handleEventsWithWorkerPool(m1,m2);//分组消费:一个消息只能被一个消费者消费,多消费者轮询处理//disruptor.handleEventsWith(m1).then(m2);   //顺序消费:1、3先并行处理,然后2再处理disruptor.start();//配置多消费者,每个消费者将有单独的线程处理return disruptor.getRingBuffer();}
}

五、消息生产者MsgPublish

package com.monika.main.system.mq;import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author:whh* @date: 2024-12-04 20:45* <p></p>*/
@Component
public class MsgPublish {public static void publish(String message){/*** 返回布尔值,表示事件是否发布成功,如果失败可根据此值进行业务逻辑判断*/boolean b = ringBuffer.tryPublishEvent(TRANSLATOR, message);}private static final EventTranslatorOneArg<MsgEvent,String> TRANSLATOR =  new EventTranslatorOneArg<MsgEvent,String>() {@Overridepublic void translateTo(MsgEvent event, long sequence, String arg0) {event.setData(arg0);}};private static RingBuffer<MsgEvent> ringBuffer;@Autowiredpublic  void setRingBuffer(RingBuffer<MsgEvent> ringBuffer) {MsgPublish.ringBuffer = ringBuffer;}
}

六、测试

本次测试使用的是分组模式,可以发现一个消息只能被一个消费者消费,且每个消费者都由单独的线程处理。
在这里插入图片描述

七、总结

本次只是简单的应用disruptor框架实现生产-消费者模式,对于disruptor的原理主要是RingBuffer环形数组,这个咱们后续再进一步研究。

http://www.lryc.cn/news/497207.html

相关文章:

  • 【Vivado】xdc约束文件编写
  • Redis使用场景-缓存-缓存雪崩
  • 概率论相关知识随记
  • 【PlantUML系列】序列图(二)
  • WPF+MVVM案例实战与特效(三十四)- 日志管理:使用 log4net 实现高效日志记录
  • 前端测试框架 jasmine 的使用
  • Qwen2-VL视觉大模型微调实战:LaTex公式OCR识别任务(完整代码)
  • 「Mac玩转仓颉内测版42」小学奥数篇5 - 圆和矩形的面积计算
  • Groom Blender to UE5
  • 开发一套ERP 第十弹 图片作为配置文件,本地读取图片,定时更新图片类型
  • 第七十六条:努力保持故障的原子性
  • Word分栏后出现空白页解决方法
  • 基于HTML和CSS的校园网页设计与实现
  • 【算法day7】字符串:反转与替换
  • 分布式存储厂商
  • 合合信息扫描全能王线下体验活动:科技与人文的完美交融
  • 单链表在Go语言中的实现与操作
  • 网关整合sentinel无法读取nacos配置问题分析
  • 简化XPath表达式的方法与实践
  • 【文件下载】接口传递文件成功和失败时,前端的处理方式
  • html+css网页设计马林旅行社移动端4个页面
  • 视频 的 音频通道提取 以及 视频转URL 的在线工具!
  • 容易被遗忘的测试用例
  • uni-app写的微信小程序如何实现账号密码登录后获取token,并且每天的第一次登录后都会直接获取参数而不是耀重新登录(2)
  • 统计中间件稳定性指标
  • 移动端使用REM插件postcss之postcss-px2rem
  • FPGA Xilinx维特比译码器实现卷积码译码
  • hive 行转列
  • Vue中使用ECharts图表中的阈值标记(附源码)
  • 【特征融合】融合空间域和频率域提升边缘检测能力