当前位置: 首页 > news >正文

Message Processing With Spring Integration高级应用:自定义消息通道与端点

一、Spring Integration 简介

Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。

核心目标:
  1. 提供简单的模型来实现复杂的企业集成。
  2. 支持与外部系统的集成。
  3. 提供模块化、松耦合的消息处理架构。

二、Spring Integration 核心组件

1. 消息(Message)
  • 定义:消息是 Spring Integration 的核心,包含 payload(负载)和 header(头部)。
  • 创建消息:通过 MessageBuilder 创建消息。

代码示例

import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;Message<String> message = MessageBuilder.withPayload("Message Payload").setHeader("Message_Header1", "Header1_Value").setHeader("Message_Header2", "Header2_Value").build();

2. 消息通道(Message Channel)
  • 定义:消息通道是消息传递的管道,连接消息的生产者和消费者。
  • 类型
    • 点对点(Point-to-Point):每条消息最多被一个消费者接收。
    • 发布/订阅(Publish/Subscribe):每条消息可以被多个订阅者接收。
  • 常见实现
    • DirectChannel:默认点对点通道。
    • NullChannel:虚拟通道,用于测试和调试。
    • 其他:PublishSubscribeChannelQueueChannelPriorityChannel 等。

3. 消息端点(Message Endpoint)

消息端点是应用程序代码与消息基础设施之间的桥梁,主要类型包括:

  • Transformer:转换消息内容或结构。
  • Filter:过滤不符合条件的消息。
  • Router:根据条件将消息路由到不同的通道。
  • Splitter:将消息拆分为多个子消息。
  • Aggregator:将多个消息聚合为一个消息。
  • Service Activator:连接服务实例到消息系统。
  • Channel Adapter:连接消息通道与外部系统。

三、货物处理系统示例

1. 需求

实现一个货物处理系统,功能包括:

  1. 接收货物消息。
  2. 拆分货物列表为单个货物消息。
  3. 基于重量过滤货物。
  4. 根据运输类型(国内/国际)路由货物。
  5. 转换货物消息。
  6. 最终处理并记录货物信息。

2. 项目环境
  • JDK:1.8
  • Spring:4.1.2
  • Spring Integration:4.1.0
  • Maven:3.2.2
  • 操作系统:Ubuntu 14.04

3. 完整代码实现
Step 1:添加依赖

pom.xml 中添加 Spring 和 Spring Integration 的依赖:

<properties><spring.version>4.1.2.RELEASE</spring.version><spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties><dependencies><!-- Spring 核心依赖 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><!-- Spring Integration 核心依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId><version>${spring.integration.version}</version></dependency>
</dependencies>

Step 2:配置类

创建 AppConfiguration 类,配置消息通道和启用 Spring Integration:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {@Beanpublic MessageChannel cargoGWDefaultRequestChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoSplitterOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoFilterOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoTransformerOutputChannel() {return new DirectChannel();}
}

Step 3:消息网关

定义 CargoGateway 接口,作为消息系统的入口:

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;import java.util.List;@MessagingGateway
public interface CargoGateway {@Gateway(requestChannel = "cargoGWDefaultRequestChannel")void processCargoRequest(Message<List<Cargo>> message);
}

Step 4:消息拆分器

实现 CargoSplitter,将货物列表拆分为单个货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;import java.util.List;@MessageEndpoint
public class CargoSplitter {@Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel")public List<Cargo> splitCargoList(Message<List<Cargo>> message) {return message.getPayload();}
}

Step 5:消息过滤器

实现 CargoFilter,过滤重量超过限制的货物:

import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;@MessageEndpoint
public class CargoFilter {private static final double CARGO_WEIGHT_LIMIT = 1000.0;@Filter(inputChannel = "cargoSplitterOutputChannel", outputChannel = "cargoFilterOutputChannel", discardChannel = "cargoFilterDiscardChannel")public boolean filterCargo(Cargo cargo) {return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;}
}

Step 6:服务激活器

实现 CargoServiceActivator,处理最终的货物消息:

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;@MessageEndpoint
public class CargoServiceActivator {@ServiceActivator(inputChannel = "cargoTransformerOutputChannel")public void processCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {System.out.println("Processed Cargo: " + cargo + " in Batch: " + batchId);}
}

Step 7:运行主程序

创建 Application 类,初始化 Spring 容器并发送货物请求:

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;import java.util.Arrays;
import java.util.List;public class Application {public static void main(String[] args) {ApplicationContext context = new AnnotationConfigApplicationContext(AppConfiguration.class);CargoGateway gateway = context.getBean(CargoGateway.class);List<Cargo> cargos = Arrays.asList(new Cargo(1, "Receiver1", "Address1", 500, "Domestic"),new Cargo(2, "Receiver2", "Address2", 1500, "International"));gateway.processCargoRequest(MessageBuilder.withPayload(cargos).build());}
}

四、运行过程

  1. 启动 Application 类。
  2. 系统会根据配置:
    • 拆分货物列表。
    • 过滤重量超过限制的货物。
    • 路由货物到不同的通道。
    • 最终处理并记录货物信息。
  3. 控制台输出处理结果。

五、适用场景

Spring Integration 非常适合以下场景:

  1. 企业系统集成:如 ERP、CRM、供应链系统之间的数据交换。
  2. 消息驱动架构:如基于事件的微服务通信。
  3. 复杂消息处理:如批量处理、过滤、路由、转换等。
  4. 与外部系统交互:如文件系统、消息队列(RabbitMQ、Kafka)、数据库等。

通过 Spring Integration,可以轻松实现复杂的企业集成需求,同时保持代码的可维护性和扩展性。
参考链接:https://dzone.com/articles/message-processing-spring

http://www.lryc.cn/news/507805.html

相关文章:

  • S32K324 MCAL中的Postbuild和PreCompile使用
  • kubeadm_k8s_v1.31高可用部署教程
  • 【AI日记】24.12.22 容忍与自由 | 环境因素和个人因素
  • 【Java基础面试题030】Java和Go的区别?
  • 学习嵩山版《Java 开发手册》:编程规约 - 常量定义(P5)
  • 洛谷 P1595 信封问题 C语言递归
  • QT创建一个模板槽和信号刷新UI
  • 【计算机视觉基础CV-图像分类】01- 从历史源头到深度时代:一文读懂计算机视觉的进化脉络、核心任务与产业蓝图
  • C# cad启动自动加载启动插件、类库编译 多个dll合并为一个
  • Mybatis增删改查(配置文件版)
  • 【Spring Security系列】5 次密码错误触发账号锁定?Spring Security 高效实现方案详解
  • 笔记day5
  • Linux快速入门-兼期末快速复习使用
  • 浅谈文生图Stable Diffusion(SD)相关模型基础
  • Vivado使用VScode编译器
  • CEF127 编译指南 MacOS 篇 - 拉取 CEF 源码(五)
  • Jenkins 中 写 shell 命令执行失败,检测失败问题
  • Java程序打包成exe,无Java环境也能运行
  • 【java 正则表达式 笔记】
  • 基于PWLCM混沌映射的麋鹿群优化算法(Elk herd optimizer,EHO)的多无人机协同路径规划,MATLAB代码
  • Vue2五、自定义指令,全局局部注册、指令的值 ,插槽--默认插槽,具名插槽 ( 作用域插槽)
  • Pika Labs技术浅析(五):商业智能技术
  • YOLO-World:Real-Time Open-Vocabulary Object Detection
  • Fastjson <= 1.2.47 反序列化漏洞复现
  • 鸿蒙项目云捐助第二十一讲云捐助项目物联网IoT模拟器的使用
  • 大数据技术原理与应用期末复习-知识点(二)
  • 高效准确的PDF解析工具,赋能企业非结构化数据治理
  • C/C++ 数据结构与算法【栈和队列】 栈+队列详细解析【日常学习,考研必备】带图+详细代码
  • 读书笔记~管理修炼-缄默效应
  • 视频会议系统会前预约模块必须包含哪些功能?