当前位置: 首页 > news >正文

Spring+ActiveMQ

1. 环境搭建

1.1 env-version

JDK

1.8

Spring

2.7.13

Maven

3.6

ActiveMQ

5.15.2

1.2 docker-compose.yml

version: '3.8'services:activemq:image: rmohr/activemq:5.16.3container_name: activemqports:- "61616:61616"- "8161:8161"environment:- ACTIVEMQ_ADMIN_LOGIN=admin- ACTIVEMQ_ADMIN_PASSWORD=admin- ACTIVEMQ_CONFIG_MINMEMORY=512- ACTIVEMQ_CONFIG_MAXMEMORY=2048
#    volumes:
#     - ./data/activemq:/var/activemq/data
#     - ./conf/activemq.xml:/var/activemq/conf/activemq.xmlnetworks:- activemq-network    networks:activemq-network:driver: bridge

在这个docker-compose.yml文件中:

  • activemq服务使用了rmohr/activemq Docker镜像,这是一个社区维护的ActiveMQ镜像。请确保选择一个与你的Spring Boot版本兼容的ActiveMQ版本。

  • container_name设置了容器的名称。

  • ports映射了ActiveMQ的JMS端口(61616)和管理控制台端口(8161)到宿主机的相同端口。

  • environment部分设置了管理员账号和密码,以及JVM的最小和最大内存配置。这些可以根据需要进行调整。

  • volumes部分映射了宿主机的目录到容器内部,用于持久化ActiveMQ的数据和配置文件。你需要创建相应的目录并放置你的activemq.xml配置文件。

  • networks定义了一个自定义网络,以便ActiveMQ服务可以连接到其他可能需要的Docker服务。

在使用这个docker-compose.yml文件之前,请确保你已经创建了dataconf目录,并且在conf目录中放置了自定义的activemq.xml配置文件。如果不需要持久化存储,可以移除volumes部分。

1.3 添加依赖

<!-- ActiveMQ 依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-spring -->
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-spring</artifactId><version>5.15.4</version><!-- 排除依赖 --><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion>
</exclusions>
</dependency><!-- Spring Boot 与 JMS 集成的 starter -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-activemq -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><version>2.7.12</version>
</dependency>

2. 工程结构

activemq/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   ├── com/
│   │   │   │   ├── xiaokai/
│   │   │   │   │   ├── ActiveMQApplication.java   // 应用程序的主类,通常包含main方法
│   │   │   │   │   ├── config/                   // 配置包
│   │   │   │   │   │   └── JmsConfig.java        // ActiveMQ的配置类
│   │   │   │   │   ├── event/                   // 事件包
│   │   │   │   │   │   ├── Eventinfo.java        // 事件信息类:构建消息、send topic
│   │   │   │   │   ├── listener/                // 监听器包
│   │   │   │   │   │   └── MessageListener.java  // 消息监听器类
│   │   │   │   │   ├── service/                 // 服务包
│   │   │   │   │   │   └── ActiveMQService.java   // ActiveMQ服务类
│   │   │   │   │   
│   │   │   ├── resources/                      // 资源文件
│   │   │   │   └── application.yml             // Spring配置文件
│   │   │   
│   │   ├── test/                                // 测试代码
│   │   │   ├── java/                           // 测试Java代码
│   │   │   │   ├── com/
│   │   │   │   │   ├── xiaokai/
│   │   │   │   │   └── ActiveMQTest.java        // ActiveMQ的测试类
└── pom.xml                                     // Maven构建配置文件(未在文件内容中列出)

ActiveMQApplication.java:项目的主类,通常包含启动Spring应用程序的main方法。

JmsConfig.java:配置ActiveMQ的Java配置类。

Eventinfo.java:可能用于表示事件信息的类。

MessageListener.java:消息监听器,用于监听并处理ActiveMQ消息。

ActiveMQService.java:服务类,可能包含与ActiveMQ交互的业务逻辑。

application.yml:Spring Boot的配置文件,用于配置应用程序的各种参数。

ActiveMQTest.java:用于测试ActiveMQ功能的测试类。

3. 示例代码

JmsConfig.java

package com.xiaokai.config;import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import javax.jms.ConnectionFactory;/*** Author:yang* Date:2024-10-19 15:57*/
@Configuration
@EnableJms
public class JmsConfig {@Beanpublic ActiveMQConnectionFactory connectionFactory() {ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setBrokerURL("tcp://116.198.242.56:61616");connectionFactory.setUserName("admin");connectionFactory.setPassword("admin");return connectionFactory;}@Beanpublic DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setSessionTransacted(true);return factory;}}

MessageListener.java

package com.xiaokai.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;/*** Author:yang* Date:2024-10-19 15:55*/
@Component
@Slf4j
public class MessageListener {// 监听队列test.queue@JmsListener(destination = "test.queue")public void onMessage(String message) {log.info("Received message: " + message);}}

ActiveMQService .java

package com.xiaokai.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;/*** Author:yang* Date:2024-10-19 16:01*/
@Service
@Slf4j
public class ActiveMQService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;public void send(String message) {log.info("Sending message: {}", message);jmsMessagingTemplate.convertAndSend("test.queue", message);}
}

注:JmsMessagingTemplate作为Spring相关bean,封装了JmsTemplate 。总的来说JmsTemplate更底层,但是在使用过程中不需要过多关注底层实现。

@Autowired private JmsMessagingTemplate jmsMessagingTemplate; 
@Autowired private JmsTemplate jmsTemplate;

application.yml

spring:activemq:broker-url: tcp://116.198.242.56:61616user: adminpassword: admin

注:在JmsConfig.java配置文件中配置后,可以不需要配置文件,二者选其一。

测试:

package com.xiaokai;import com.xiaokai.service.ActiveMQService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** Author:yang* Date:2024-10-21 10:28*/
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class ActiveMQTest {@Autowiredprivate ActiveMQService activeMQService;@Testpublic void testSend(){activeMQService.send("test");}
}

结果:

 Sending message: testStarted ActiveMQApplication in 1.368 seconds (JVM running for 1.799)Received message: test

注:在ActiveMQ提供的可视化控制台可以查看相关信息。

访问:http://116.198.242.56:8161/admin/index.jsp

4. 消息模型

4.1 P2P模型

bean

@Bean
public Destination queue() {return new ActiveMQQueue("test.queue");
}

消息监听器

@Component
@Slf4j
public class MessageListener {// 监听队列test.queue@JmsListener(destination = "test.queue")public void onMessage1(String message) {log.info("Received queue message1: " + message);}// 监听队列test.queue@JmsListener(destination = "test.queue")public void onMessage2(String message) {log.info("Received queue message2: " + message);}}

消息服务

@Service
@Slf4j
public class ActiveMQService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Destination queue;// 发送点对点消息public void sendP2P(String message) {log.info("Sending queue message: {}", message);jmsMessagingTemplate.convertAndSend(queue, message);}}

application.yml

spring:activemq:broker-url: tcp://116.198.242.56:61616user: adminpassword: admin#    true表示使用发布/订阅模式,false表示使用点对点模式jms:pub-sub-domain: false

结论:点对点消息模式是将消息推送到queue中,消费者通过轮训的方式消费消息

4.2 发布/订阅模型

Bean

@Bean
public Destination topic() {return new ActiveMQTopic("test.topic");
}

消息监听器

@Component
@Slf4j
public class PubMessageListener {// 监听主题test.topic@JmsListener(destination = "test.topic")public void onMessage3(String message) {log.info("Received topic message1: " + message);}// 监听主题test.topic@JmsListener(destination = "test.topic")public void onMessage4(String message) {log.info("Received topic message2: " + message);}}

消息服务

@Service
@Slf4j
public class ActiveMQService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Destination topic;// 发送发布订阅消息public void sendPubSub(String message) {log.info("Sending topic message: {}", message);jmsMessagingTemplate.convertAndSend(topic, message);}}

application.yml

spring:activemq:broker-url: tcp://116.198.242.56:61616user: adminpassword: admin#    true表示使用发布/订阅模式,false表示使用点对点模式jms:pub-sub-domain: true

注:需要将发布/订阅开关打开

结论:发送消息后,订阅主题的消费者都能收到同一条消息去消费。

5. 消息类型

5.1 普通消息

        普通消息如上述案例,生产者生产消息后,由消费者消费消息,中间不需要做额外的事情。

5.2 延迟消息

延迟消息指在生产者生产带有延迟时间的消息后,broker接收到消息后,并不立即投送到队列或者主题,而是到达延迟时间后,再将消息投送到队列、主题。

配置ActiveMQ支持延迟消息: 修改ActiveMQ的配置文件activemq.xml,确保<broker>标签包含schedulerSupport="true"属性。这允许ActiveMQ的计划任务功能,从而支持延迟消息。

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

修改配置后,需要重启ActiveMQ服务器以使更改生效。

发送延迟消息

// 发送延迟消息
public void sendDelay(String message) {HashMap<String, Object> properties = new HashMap<>();properties.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);log.info("Sending delay queue message: {}", message);jmsMessagingTemplate.convertAndSend(queue, message, properties);
}

消息监听器

...

5.3 事务消息

        没啥用,用ActiveMQ实现事务消息还不如不用,辣鸡(狗头保命)

http://www.lryc.cn/news/470233.html

相关文章:

  • Linux 常用命令总汇
  • fmql之Linux RTC
  • Flask-SocketIO 简单示例
  • Vue 3 的组件式开发(2)
  • python 爬虫 入门 四、线程,进程,协程
  • cloak斗篷伪装下的独立站
  • 【Nas】X-DOC:在Mac OS X 中使用 WOL 命令唤醒局域网内 PVE 主机
  • u盘装win10系统提示“windows无法安装到这个磁盘,选中的磁盘采用GPT分区形式”解决方法
  • Linux系统之dc计算器工具的基本使用
  • 使用Python计算相对强弱指数(RSI)进阶
  • vue 解决:npm ERR! code ERESOLVE 及 npm ERR! ERESOLVE could not resolve 的方案
  • Android 原生开发与Harmony原生开发浅析
  • VIVO售后真好:屏幕绿线,4年免费换屏
  • 数据类型【MySQL】
  • 流媒体协议.之(RTP,RTCP,RTSP,RTMP,HTTP)(二)
  • 在 Kakarot ZkEVM 上使用 Starknet Scaffold 构建应用
  • DBeave如何连接达梦数据库,设置达梦驱动,真酷
  • 2024年全球 MoonBit 编程创新赛-零基础早鸟教程-使用wasm4八小时开发井子棋小游戏
  • 机器学习4
  • Python数值计算(33)——simpson 3/8积分公式
  • <项目代码>YOLOv8路面垃圾识别<目标检测>
  • Java中的注解(白金版)
  • actor模型
  • 合约门合同全生命周期管理系统:企业智能合同管理的新时代
  • vscode如何debug环境配置?torchrun与deepspeed库又该如何配置?
  • Qt元对象系统 —— 信号与槽
  • 单细胞配色效果模拟器 | 简陋版(已有颜色数组)
  • 面向对象编程中类与类之间的关系(一)
  • streamlit 实现 flink SQL运行界面
  • 鲸鱼优化算法(Whale Optimization Algorithm, WOA)原理与MATLAB例程