当前位置: 首页 > news >正文

创建延时队列、springboot配置多个rabbitmq

创建延时队列

queue.file_delay_destroy
x-dead-letter-exchange:	exchange.file_delay_destroy
x-message-ttl:	259200000
259200000为3天,1000为1秒

在这里插入图片描述

创建普通队列

queue.file_destroy

创建普通交换机

exchange.file_delay_destroy

type选择fanout
在这里插入图片描述

交换机绑定普通队列

(图中已经绑定,红框为绑定过程)
在这里插入图片描述

普通队列绑定交换机

(图中已经绑定,红框为绑定过程)
在这里插入图片描述

延时队列

springboot配置多个rabbitmq

延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@EqualsAndHashCode
@Data
public class MQMessage implements Serializable {private JSONObject msg;private String messageId;   //存储消息发送的唯一标识
}
import com.sxqx.entity.MQMessage;public interface MQMessageSender {/**** @param queue 消息队列名称* @param msg 消息*/void send(String queue, MQMessage msg);
}

RabbitConfig配置类

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;@Configuration
public class RabbitConfig {@Primary@Bean(name="mq1ConnectionFactory")public ConnectionFactory mq1ConnectionFactory(@Value("${spring.rabbitmq.mq1.host}") String host,@Value("${spring.rabbitmq.mq1.port}") int port,@Value("${spring.rabbitmq.mq1.username}") String username,@Value("${spring.rabbitmq.mq1.password}") String password,@Value("${spring.rabbitmq.mq1.virtual-host}") String virtualHost){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name="mq2ConnectionFactory")public ConnectionFactory mq2ConnectionFactory(@Value("${spring.rabbitmq.mq2.host}") String host,@Value("${spring.rabbitmq.mq2.port}") int port,@Value("${spring.rabbitmq.mq2.username}") String username,@Value("${spring.rabbitmq.mq2.password}") String password,@Value("${spring.rabbitmq.mq2.virtual-host}") String virtualHost){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);return connectionFactory;}@Primary@Bean(name="mq1RabbitTemplate")public RabbitTemplate mq1RabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory){RabbitTemplate mq1RabbitTemplate = new RabbitTemplate(connectionFactory);mq1RabbitTemplate.setMessageConverter(jsonMessageConverter());return mq1RabbitTemplate;}@Bean(name="mq2RabbitTemplate")public RabbitTemplate mq2RabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory){RabbitTemplate mq2RabbitTemplate = new RabbitTemplate(connectionFactory);mq2RabbitTemplate.setMessageConverter(jsonMessageConverter());return mq2RabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Bean(name = "mq1Factory")@Primarypublic SimpleRabbitListenerContainerFactory mq1Factory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "mq2Factory")public SimpleRabbitListenerContainerFactory mq2Factory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}
}

mq1

@Component
public class RabbitMQ1MessageSender  implements MQMessageSender, RabbitTemplate.ConfirmCallback{@Resource(name = "mq1RabbitTemplate")private RabbitTemplate mq1RabbitTemplate;Log log = LogFactory.getLog(RabbitMQ1MessageSender.class);@Autowiredpublic RabbitMQ1MessageSender() {}@PostConstructpublic void init(){mq1RabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){log.error("消息接收失败" + cause);// 我们这里要做一些消息补发的措施System.out.println("id="+correlationData.getId());}}@Overridepublic void send(String routingKey, MQMessage msg) {String jsonString = JsonConverter.bean2Json(msg);if (jsonString != null) {mq1RabbitTemplate.convertAndSend(routingKey, jsonString);}}
}

mq2

import com.sxqx.entity.MQMessage;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Component
public class RabbitMQ2MessageSender implements MQMessageSender, RabbitTemplate.ConfirmCallback{@Resource(name = "mq2RabbitTemplate")private RabbitTemplate mq2RabbitTemplate;Log log = LogFactory.getLog(RabbitMQ2MessageSender.class);@Autowiredpublic RabbitMQ2MessageSender() {}@PostConstructpublic void init(){mq2RabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){log.error("消息接收失败" + cause);// 我们这里要做一些消息补发的措施System.out.println("id="+correlationData.getId());}}@Overridepublic void send(String routingKey, MQMessage msg) {String jsonString = JsonConverter.bean2Json(msg);if (jsonString != null) {mq2RabbitTemplate.convertAndSend(routingKey, jsonString);}}
}

application-prod.yaml

spring:rabbitmq:mq1:username: guestpassword: guesthost: mq1_ipport: 5672virtual-host: /publisher-returns: truepublisher-confirm-type: simplelistener:simple:acknowledge-mode: auto # 手动应答prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发retry:max-attempts: 5 # 重试次数enabled: true # 开启重试concurrency: 5max-concurrency: 10mq2:username: guestpassword: guesthost: mq2_ipport: 5672virtual-host: /publisher-returns: truepublisher-confirm-type: simplelistener:simple:acknowledge-mode: auto # 手动应答prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发retry:max-attempts: 5 # 重试次数enabled: true # 开启重试concurrency: 5max-concurrency: 10

mq1消费端,发消息给mq2

@Component
public class SyncWeatherLivePicMessageReceiver  implements IMessageReceiver {private final IWeatherLivePicMapper weatherLivePicMapper;private final RabbitMQ2MessageSender rabbitMQ2MessageSender;@Autowiredpublic SyncWeatherLivePicMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper,RabbitMQ2MessageSender rabbitMQ2MessageSender) {this.weatherLivePicMapper = weatherLivePicMapper;this.rabbitMQ2MessageSender = rabbitMQ2MessageSender;}Log log = LogFactory.getLog(SyncWeatherLivePicMessageReceiver.class);private FtpHelper ftpHelperIns1;private FtpHelper ftpHelperIns2;private FTPFileFilter ftpFileFilter;@RabbitListener(queuesToDeclare = {@Queue(name = "sxqxgzbgxw_weather_live_pic")})@RabbitHandler@Overridepublic void onMessageReceived(String mqMessageString) {JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);JsonNode msg = jsonNode.findValue("msg");JsonNode JsonNodeParams = msg.findValue("params");Map<String, Object> params = JsonConverter.jsonNode2HashMap(JsonNodeParams);if (params.size() > 0) {String times = params.get("times").toString();String serverFrom = params.get("serverFrom").toString();......// 清除数据MQMessage mqMessage = new MQMessage();JSONObject jsonObject = new JSONObject();jsonObject.put("filePath", "/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times);mqMessage.setMsg(jsonObject);rabbitMQ2MessageSender.send("queue.file_delay_destroy", mqMessage);// 清除DB记录MQMessage mqMessage2 = new MQMessage();JSONObject jsonObject2 = new JSONObject();jsonObject2.put("tableName", "WEATHER_LIVE_PIC");jsonObject2.put("picUrl", "http://外网ip/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times +"/"+ type + "/" + "gjzjqyz/"+ fileNameFrom);mqMessage2.setMsg(jsonObject);rabbitMQ2MessageSender.send("queue.db_delay_destroy", mqMessage2);                }}
}

mq2消费端用于递归删除文件

import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.utils.dataConverter.JsonConverter;
import com.sxqx.utils.file.FileHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.File;@Component
public class FileDestroyMessageReceiver implements IMessageReceiver {Log log = LogFactory.getLog(FileDestroyMessageReceiver.class);@RabbitListener(queuesToDeclare = {@Queue(name = "queue.file_destroy")})@RabbitHandler@Overridepublic void onMessageReceived(String mqMessageString) {JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);JsonNode msg = jsonNode.findValue("msg");String filePath = msg.findValue("filePath").asText();if (filePath.contains("/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000")) {File file = new File(filePath);if (file.exists()) {FileHelper.deleteFile(file);}} else {log.info("有人想删除设定之外的文件");log.info(filePath);}}
}

FileHelper工具类递归删除文件或文件夹

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;public class FileHelper {/*** 递归删除文件或文件夹* @param directory*/public static void deleteFile(File directory) {if (!directory.exists()) {return;}File[] files = directory.listFiles();if (files!=null) {//如果包含文件进行删除操作for (File value : files) {if (value.isFile()) {//删除子文件value.delete();} else if (value.isDirectory()) {//通过递归的方法找到子目录的文件deleteFile(value);}value.delete();//删除子目录}}directory.delete();}
}

mq2消费端用于删除数据库数据

import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.mapper.remote.xugugzb.weatherlivepic.IWeatherLivePicMapper;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Objects;@Component
public class DBDestroyMessageReceiver implements IMessageReceiver {private final IWeatherLivePicMapper weatherLivePicMapper;@Autowiredpublic DBDestroyMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper){this.weatherLivePicMapper = weatherLivePicMapper;}Log log = LogFactory.getLog(DBDestroyMessageReceiver.class);@RabbitListener(queuesToDeclare = {@Queue(name = "queue.db_destroy")})@RabbitHandler@Overridepublic void onMessageReceived(String mqMessageString) {JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);JsonNode msg = jsonNode.findValue("msg");String tableName = msg.findValue("tableName").asText();String picUrl = msg.findValue("picUrl").asText();if (picUrl.contains("/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/")) {if (Objects.equals("WEATHER_LIVE_PIC",tableName)) {weatherLivePicMapper.deleteWeatherLivePic(picUrl);}} else {log.info("有人想删除设定之外的数据");log.info(picUrl);}}
}

nginx.conf

上传静态资源至linux path,配置nginx.conf,使浏览器可以直接访问静态资源

server {listen       80;server_name  60.204.202.112;add_header   Cache-Control no-store;charset	     utf-8;location / {root   /mnt/sxqxgxw-gzb-front/dist/;try_files $uri $uri/ /index.html;index  index.html index.htm;}#访问/dataSharingStatic时,相当于访问/data/static/dataSharingStatic路径下资源location /dataSharingStatic {root   /data/static;autoindex   on;add_header  Access-Control-Allow-Origin *;add_header  Access-Control-Allow-Headers X-Requestd-With;add_header  Access-Control-Allow-Methods GET,POST,OPTIONS;}error_page   500 502 503 504  /50x.html;location = /50x.html {root   html;}location /api/ {proxy_pass    http://60.204.202.112:8896;rewrite	  ^/api/(.*)$ /$1 break;add_header    Access-Control-Allow-Origin *;add_header    Access-Control-Allow-Headers X-Requestd-With;add_header	  Access-Control-Allow-Methods GET,POST,OPTIONS;}}
http://www.lryc.cn/news/143795.html

相关文章:

  • 在kaggle中用GPU使用CGAN生成指定mnist手写数字
  • 【NI USRP】哪些 USRP 设备支持全双工,哪些支持半双工?
  • 不拼花哨,只拼实用:unittest指南,干货为王!
  • mysql 获取json数组中某个字段根据下标
  • 深入理解Redis缓存穿透、击穿、雪崩及解决方案
  • java八股文面试[java基础]——字节码
  • 新能源汽车技术的最新进展和未来趋势
  • 知虾shopee数据分析工具:shopee出单的商机利器
  • python——ydata-profiling介绍与使用
  • (纯c)数据结构之------>链表(详解)
  • postman接口自动化测试框架实战!
  • Apache Doris 入门教程35:多源数据目录
  • 响应式web-PC端web与移动端web(H5)兼容适配 选型方案
  • Redis持久化之RDB解读
  • 四维图新 minemap实现地图漫游效果
  • centos7安装MySQL8
  • 【IMX6ULL驱动开发学习】10.Linux I2C驱动实战:AT24C02驱动设计流程
  • 【C++】详解声明和定义
  • 掌握C/C++协程编程,轻松驾驭并发编程世界
  • MyBatis-Plus的分页配置类
  • 排序算法-选择排序(Java)
  • SpringBoot 怎么返回html界面
  • watch computed 和 method
  • 数据结构,线性表有哪些
  • 服务间通过Feign相互调用报错,参数是MultiparFile、参数是POJO报错
  • Flutter系列文章-Flutter应用优化
  • opencv案例03 -基于OpenCV实现二维码生成,发现,定位,识别
  • 叠螺式污泥脱水机的要点及价格分析
  • Visual Studio中Linux开发头文件intellisense问题的解决办法
  • 如何以CPU方式启动Stable Diffusion WebUI?