当前位置: 首页 > news >正文

Disruptor 高性能环形消息框架

官方文档:Disruptor

1. 简介

        Disruptor是一个高性能的互进程(Inter-process)和多线程(Multi-threaded)消息处理库,由LMAX交易所开发,用于在Java虚拟机(JVM)上实现高性能的交换和处理数据。Disruptor的核心目标是提供一种低延迟、高吞吐量的解决方案。

一些关键特性:

  • Disruptor设计用来减少延迟,因为它避免了使用锁和线程间上下文切换,而是采用了一种基于缓存行(cache line)的设计理念。

  • 通过使用无锁编程和环形缓冲区(Ring Buffer)来实现高效的数据交换,Disruptor能够达到纳秒级别的延迟。

  • Disruptor的API简洁,易于理解和使用,使得开发者可以快速地集成到现有的系统中。

  • 它支持多种消息处理模式,包括单线程、多线程和多进程处理。

  • Disruptor基于事件驱动模型,可以处理事件的发布和订阅。

  • 它保证了事件的顺序和一致性,这对于需要顺序处理的业务场景非常重要。

  • Disruptor可以轻松地扩展以适应不同的处理需求,无论是增加消费者数量还是处理不同类型的事件。

  • 它提供了精细的内存管理策略,包括预分配内存和内存屏障(Memory Barriers)的使用,以确保数据的可见性和一致性。

Disruptor的工作流程大致如下:

  • 生产者(Producer):向环形缓冲区发布事件。

  • 消费者(Consumer):从环形缓冲区读取事件并处理。

  • 环形缓冲区(Ring Buffer):一个固定大小的缓冲区,事件被顺序地写入,并且可以被多个消费者并发读取。

  • 序列屏障(Sequence Barrier):用于同步生产者和消费者之间的进度,确保消费者不会读取到未完全写入的事件。

2. 比较

        与Spring消息监听器、Redis发布/订阅、Guava的EventBus一样,提供生产消息与消费消息的能力,极大的解耦应用程序各模块。

特性/技术DisruptorRedis发布订阅Guava EventBusSpring消息监听器
设计目的高性能、低延迟的消息处理分布式消息传递简化事件发布和订阅流程与Spring框架集成的声明式事件处理
性能极高,纳秒级延迟较高,受网络延迟影响适中,适用于中等负载适中,依赖于Spring事件传播机制
可靠性高,适合关键任务较低,消息可能会丢失适中,需要正确管理订阅和发布高,Spring框架提供事务支持
易用性低,需要深入了解并发编程高,简单的发布订阅模型高,直观的API和灵活的线程模型高,Spring框架提供注解支持
分布式支持否,仅限于单个JVM内部是,支持跨多个节点和应用的消息传递否,仅限于单个JVM内部否,仅限于单个JVM内部(除非结合消息中间件)
线程模型无锁设计,多线程多线程,基于发布订阅机制支持同步和异步事件分发支持同步和异步事件分发
适用场景高性能计算,如金融交易系统分布式系统的消息传递,如微服务架构简单的事件驱动应用,需要灵活的事件处理需要Spring框架支持的企业级应用
配置复杂度高,需要配置事件、工厂和处理器低,Redis简单配置即可使用低,通过注解或API简单配置低,Spring框架自动配置
社区和文档活跃,由LMAX提供支持非常活跃,Redis社区广泛支持活跃,Guava库由Google维护非常活跃,Spring社区广泛支持
扩展性高,可以自定义事件和处理器高,可以与其他Redis特性结合使用高,可以自定义事件处理逻辑高,可以自定义事件和监听器
持久性否,内存中处理,不提供持久化是,可以结合Redis持久化选项否,内存中处理,不提供持久化可以结合数据库事务管理持久化

3. 实例

3.1 添加依赖

<!--        disruptor-->
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version>
</dependency>

3.2 消息实体

package org.example.event;public class TradeEvent {private long tradeId;private String symbol;private double price;private int volume;// Constructor, getters and setterspublic TradeEvent() {}public long getTradeId() {return tradeId;}public void setTradeId(long tradeId) {this.tradeId = tradeId;}public String getSymbol() {return symbol;}public void setSymbol(String symbol) {this.symbol = symbol;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public int getVolume() {return volume;}public void setVolume(int volume) {this.volume = volume;}
}

3.3 消息实体工厂

import com.lmax.disruptor.EventFactory;public class TradeEventFactory implements EventFactory<TradeEvent> {@Overridepublic TradeEvent newInstance() {return new TradeEvent();}
}

3.4 消息处理器

public class TradeEventHandler implements EventHandler<TradeEvent> {@Overridepublic void onEvent(TradeEvent event, long sequence, boolean endOfBatch) {System.out.println(String.format("Trade Event. ID: %d, Symbol: %s, Price: %.2f, Volume: %d",event.getTradeId(), event.getSymbol(), event.getPrice(), event.getVolume()));}
}

3.5 配置disruptor启动器

package org.example.config;import com.lmax.disruptor.dsl.Disruptor;
import org.example.event.TradeEvent;
import org.example.event.TradeEventFactory;
import org.example.event.TradeEventHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.Executor;
import java.util.concurrent.Executors;@Configuration
public class DisruptorConfig {@Beanpublic Disruptor<TradeEvent> disruptor() {int bufferSize = 1024;Executor executor = Executors.newCachedThreadPool();Disruptor<TradeEvent> disruptor = new Disruptor<>(new TradeEventFactory(), bufferSize, executor);disruptor.handleEventsWith(new TradeEventHandler());System.out.println("Disruptor created.");disruptor.start();return disruptor;}}

3.6 测试

curl --request POST \--url 'http://localhost:8080/trade?apipost_id=35ef4f1dbd9000' \--header 'Accept: */*' \--header 'Accept-Encoding: gzip, deflate, br' \--header 'Connection: keep-alive' \--header 'Content-Type: application/json' \--header 'User-Agent: PostmanRuntime-ApipostRuntime/1.1.0' \--data '{"tradeId":1111,"symbol":"测试标记","price":32.3,"volume":3
}'

控制台输出

Trade Event. ID: 1111, Symbol: 测试标记, Price: 32.30, Volume: 3

http://www.lryc.cn/news/510651.html

相关文章:

  • Python列表(二)
  • 计算机网络:应用层 —— 网络应用模式
  • @Repository注解和@mapper的区别
  • 解锁成长密码:探寻刻意练习之道
  • cuda-cuDnn
  • 如何使用Python和PIL库生成带竖排文字的封面图像
  • 低代码开发 实战转型案例一览
  • SQL Server中FIRST_VALUE和 LAST_VALUE窗口函数允许在一个指定的窗口内返回第一个或最后一个值
  • 机器学习-高斯混合模型
  • 微信V3支付报错 平台证书及平台证书序列号
  • 41.欠采样技术下变频不能用与跨两个nyquist的情况下
  • 20241227通过配置nomodeset参数解决更新grub之后,ubuntu20.04.5无法启动的问题
  • 从 GitLab.com 到 JihuLab.com 的迁移指南
  • 深度学习中的并行策略概述:2 Data Parallelism
  • Python大数据可视化:基于Python对B站热门视频的数据分析与研究_flask+hive+spider
  • 利用 Python 编写一个 VIP 音乐下载脚本
  • linux内核如何实现TCP的?
  • 【Spring】基于XML的Spring容器配置——FactoryBean的使用
  • Docker使用——国内Docker的安装办法
  • 电商会员门店消费数据分析
  • Vue.js 入门与进阶:打造高效的前端开发体验
  • Java包装类型的缓存
  • 【蓝桥杯——物联网设计与开发】拓展模块4 - 脉冲模块
  • .NET平台用C#通过字节流动态操作Excel文件
  • SpringMVC详解
  • springboot、spring、springmvc有哪些注解
  • Apache Commons ThreadUtils 的使用与优化
  • 重温设计模式--5、职责链模式
  • 下午四点半
  • 嵌入式单片机中Flash存储器控制与实现