当前位置: 首页 > news >正文

【开源项目】Disruptor框架介绍及快速入门

Disruptor框架简介

Disruptor框架内部核心的数据结构是Ring Buffer,Ring Buffer是一个环形的数组,Disruptor框架以Ring Buffer为核心实现了异步事件处理的高性能架构;JDK的BlockingQueue相信大家都用过,其是一个阻塞队列,内部通过锁机制实现生产者和消费者之间线程的同步。跟BlockingQueue一样,Disruptor框架也是围绕Ring Buffer实现生产者和消费者之间数据的交换,只不过Disruptor框架性能更高,笔者曾经在同样的环境下拿Disruptor框架跟ArrayBlockingQueue做过性能测试,Disruptor框架处理数据的性能比ArrayBlockingQueue的快几倍。

Disruptor框架性能为什么会更好呢?其有以下特点:

  1. 预加载内存可以理解为使用了内存池;
  2. 无锁化
  3. 单线程写
  4. 消除伪共享
  5. 使用内存屏障
  6. 序号栅栏机制

相关概念

  • Disruptor:是使用Disruptor框架的核心类,持有RingBuffer、消费者线程池、消费者集合ConsumerRepository和消费者异常处理器ExceptionHandler等引用;

  • Ring Buffer: RingBuffer处于Disruptor框架的中心位置,其是一个环形数组,环形数组的对象采用预加载机制创建且能重用,是生产者和消费者之间交换数据的桥梁,其持有Sequencer的引用;

  • Sequencer: Sequencer是Disruptor框架的核心,实现了所有并发算法,用于生产者和消费者之间快速、正确地传递数据,其有两个实现类SingleProducerSequencer和MultiProducerSequencer。

  • Sequence:Sequence被用来标识Ring Buffer和消费者Event Processor的处理进度,每个消费者Event Processor和Ring Buffer本身都分别维护了一个Sequence,支持并发操作和顺序写,其也通过填充缓存行的方式来消除伪共享从而提高性能。

  • Sequence Barrier:Sequence Barrier即为序号屏障,通过追踪生产者的cursorSequence和每个消费者( EventProcessor)的sequence的方式来协调生产者和消费者之间的数据交换进度,其实现类ProcessingSequenceBarrier持有的WaitStrategy等待策略类是实现序号屏障的核心。

  • Wait Strategy:Wait Strategy是决定消费者如何等待生产者的策略方式,当消费者消费速度过快时,此时是不是要让消费者等待下,此时消费者等待是通过锁的方式实现还是无锁的方式实现呢?

  • Event Processor:Event Processor可以理解为消费者线程,该线程会一直从Ring Buffer获取数据来消费数据,其有两个核心实现类:BatchEventProcessor和WorkProcessor。

  • Event Handler:Event Handler可以理解为消费者实现业务逻辑的Handler,被BatchEventProcessor类引用,在BatchEventProcessor线程的死循环中不断从Ring Buffer获取数据供Event Handler消费。

  • Producer:生产者,一般用RingBuffer.publishEvent来生产数据。

快速入门

MQManager启用Disruptor,返回RingBuffer实例。

@Configuration
public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理  ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂  HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率  int bufferSize = 1024 * 256;//单线程模式,获取额外的性能  Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者  disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程  disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件  RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
}

MessageModel消息实体类

@Data
public class MessageModel {  private String message;  
}

工厂类

public class HelloEventFactory implements EventFactory<MessageModel> {@Override  public MessageModel newInstance() {  return new MessageModel();  }  
}  

消息处理器

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Override  public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {  try {  log.info("消费者处理消息开始");  if (event != null) {  log.info("消费者消费的信息是:{}",event);  }  } catch (Exception e) {  log.info("消费者处理消息失败");  }  log.info("消费者处理消息结束");  }  
}  

消息发送

@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {  @Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Override  public void sayHelloMq(String message) {  log.info("record the message: {}",message);  //获取下一个Event槽的下标  long sequence = messageModelRingBuffer.next();  try {  //给Event填充数据  MessageModel event = messageModelRingBuffer.get(sequence);  event.setMessage(message);  log.info("往消息队列中添加消息:{}", event);  } catch (Exception e) {  log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());  } finally {  //发布Event,激活观察者去消费,将sequence传递给改消费者  //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer  messageModelRingBuffer.publish(sequence);  }  }  
} 

在这里插入图片描述

http://www.lryc.cn/news/68064.html

相关文章:

  • 双向链表实现约瑟夫问题
  • 日心说为人类正确认识宇宙打下了基础(善用工具的重要性)
  • Kali-linux系统指纹识别
  • Java版本电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展
  • Java字符串知多少:String、StringBuffer、StringBuilder
  • 中国20强(上市)游戏公司2022年财报分析:营收结构优化,市场竞争进入白热化
  • 如何自学C++编程语言,聊聊C++的特点,别轻易踩坑
  • 算法Day07 | 454.四数相加II,383. 赎金信,15. 三数之和, 18. 四数之和
  • ps抠图、抠头发去背景等
  • 计算机组成原理基础练习题第一章
  • [PyTorch][chapter 34][池化层与采样]
  • Java进阶-字符串的使用
  • 接口自动化框架对比 | 质量工程
  • 谷歌浏览器network error解决方法
  • 自动化测试如何做?接口自动化测试框架必备的9个功能,测试老鸟总结...
  • ANR原理篇 - ANR原理总览
  • 新版Mamba体验超快的软件安装
  • LDAP配置与安装
  • 1-Linux环境安装JDK
  • 通胀数据回落助金价小幅回升
  • 正则表达式的基本语法以及技巧和示例
  • 蓝牙耳机怎么挑选?小编分享2023畅销蓝牙耳机排行榜
  • Linux快照太有趣了!
  • 【改进粒子群优化算法】自适应惯性权重粒子群算法(Matlab代码实现)
  • ROS 下 激光扫描仪 YDLidar-G4 使用
  • 智能边缘:数字化时代的关键战略之一
  • EasyRecovery16中文最新版电脑数据恢复软件下载使用教程
  • 什么是鉴权?这些postman鉴权方式你又知道多少?
  • 最新的经典mysql面试题及答案
  • 算法修炼之练气篇——练气十九层