当前位置: 首页 > news >正文

单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>

定义一个商品

@Data
public class Goods {private String name;}

定义生产者

public class Producer {private final RingBuffer<Goods> ringBuffer;public Producer(RingBuffer<Goods> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 生产货品* @param goodsName*/public void onData(String goodsName) {long sequence = ringBuffer.next();try {Goods goods = ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{private String name;public Consumer(String name){this.name = name;}@Overridepublic void onEvent(Goods goods, long l, boolean b)  {//消费者接收到货品System.out.println(name+"消费了"+goods.getName());}@Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}@Overridepublic void onStart() {EventHandler.super.onStart();}@Overridepublic void onShutdown() {EventHandler.super.onShutdown();}@Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}@Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//单生产者,单消费者disruptor.handleEventsWith(new Consumer("Consumer1"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3")).then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer1 = new Producer(ringBuffer);Producer producer2 = new Producer(ringBuffer);Producer producer3 = new Producer(ringBuffer);while (true){producer1.onData("goods"+UUID.randomUUID());producer2.onData("goods"+UUID.randomUUID());producer3.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

http://www.lryc.cn/news/253162.html

相关文章:

  • 好工具知多少:国内外最常用的SCADA软件
  • SQL Server 2016(创建数据库)
  • Vue学习计划--Vue2(一)简单了解vue
  • 微信小程序生成二维码并保存到本地方法
  • shell_exec 和 exec区别
  • WPF创建进度条
  • 全网最新最全面的Appium自动化:Appium常用操作之混合应用webview页面操作--待补充!
  • 基于OpenCV+YOLOv5实现车辆跟踪与计数(附源码)
  • 05、pytest断言确定的异常
  • 金蝶云星空单据编辑界面,不允许批量填充操作
  • Springboot项目启动成功后可通过五种方式继续执行
  • 什么是供应链金融分账系统?
  • 【测绘程序设计】——坐标换带与高程投影
  • 企业计算机服务器中了Mallox勒索病毒如何解密,Mallox勒索病毒数据恢复
  • 一套rk3588 rtsp服务器推流的 github 方案及记录 -01
  • PyQt6 QComboBox下拉组合框控件
  • 常用类与比较器
  • 【上海大学《面向对象程序设计A》课程小项目报告】抽象向量类模板及其派生类
  • Leetcode每日一题学习训练——Python3版(到达首都的最少油耗)
  • Java面试题(每天10题)-------连载(42)
  • netty websocket学习
  • 【数据结构】环形队列
  • 嵌入式C编码规范
  • Golang 并发 — 流水线
  • Elasticsearch:什么是非结构化数据?
  • 15:00的面试,15:06就出来了,问的问题过于变态了。。。
  • Web自动化测试怎么做?Web网页测试全流程解析
  • MySQL数据库SQLSTATE[22007]: Invalid datetime format 日期类型不能为空值的解决办法
  • 搬运工让你分分钟了解Web接口测试
  • 作业12.5