当前位置: 首页 > news >正文

Spring Boot关闭时,如何确保内存里面的mq消息被消费完?

1.背景

之前写一篇文章Spring Boot集成disruptor快速入门demo,有网友留言如下图:

1730887217742

针对网友的留言,那么我们如何解决这个问题呢

  • Spring-Boot应用停机时,如何保证其内存消息都处理完成?

2.解决方法

方法其实挺简单的,disruptor有优雅停机方法,不用我们自己去实现逻辑,只需要调用 disruptor.shutdown() ;就可以实现优雅关闭。

1.禁用kill -9

使用kill -9命令强制终止进程在某些情况下可能会导致数据丢失或资源未正确释放。以下是一些原因和替代方案,帮助你安全地停止应用程序:

为什么避免使用kill -9
  1. 数据丢失kill -9会立即终止进程,不会给应用程序任何机会去保存数据或完成正在进行的操作。

  2. 资源泄漏:进程被强制终止后,可能无法正确释放内存、文件句柄或网络连接等资源。

  3. 不执行清理逻辑:应用程序通常在关闭时执行一些清理逻辑(如关闭数据库连接、写入日志等),kill -9会跳过这些步骤。

2.优雅停机方案

Spring Boot可以引入这个包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

放开shutdown接口

management:endpoints:web:exposure:include: "*"endpoint:shutdown:enabled: true
server:port: 8088

然后post http://127.0.0.1:8088/actuator/shutdown 实现优雅停机,但是spring boot 2.3以下,停止后不能停止api继续对外。我们可以使用过滤器来禁止api对外提供服务,手动设置HttpServletResponse.SC_SERVICE_UNAVAILABLE

package com.et.disruptor.config;import org.springframework.stereotype.Component;import javax.servlet.*;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;@Component
public class GracefulShutdownFilter implements Filter {private final AtomicBoolean shuttingDown = new AtomicBoolean(false);@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)throws IOException, ServletException {if (shuttingDown.get()) {((HttpServletResponse) response).setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);return;}chain.doFilter(request, response);}public void startShutdown() {shuttingDown.set(true);}
}

DisposableBean是Spring框架中的一个接口,用于在Spring容器销毁Bean时执行一些自定义的清理逻辑。实现这个接口的Bean会在容器关闭时自动调用其destroy()方法。这对于需要在应用程序关闭时释放资源或执行其他清理操作的Bean非常有用。

package com.et.disruptor.config;import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class GracefulShutdownManager implements DisposableBean {@Autowiredprivate GracefulShutdownFilter shutdownFilter;@AutowiredMqManager mqManager;@Overridepublic void destroy() throws Exception {// reject  new  requestsshutdownFilter.startShutdown();//graceful shutdown DisruptormqManager.shutdownDisruptor(); // wait all events to complete// wait all  your self-definite task finishwaitForTasksToComplete();}private void waitForTasksToComplete() throws InterruptedException {System.out.println("Waiting for tasks to complete...");// use CountDownLatch or other//mock task processThread.sleep(100000);}
}

3.disruptor优雅关闭

如果不想显示的调用shutdown 也可以用注解@PreDestroy

package com.et.disruptor.config;import com.et.disruptor.event.HelloEventFactory;
import com.et.disruptor.event.HelloEventHandler;
import com.et.disruptor.model.MessageModel;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Configuration
public class MqManager {private static Disruptor<MessageModel> disruptor;@Bean("ringBuffer")public RingBuffer<MessageModel> messageModelRingBuffer() {//define the thread pool for consumer message hander, Disruptor touch the consumer event to process by java.util.concurrent.ExecutorSerivceExecutorService executor = Executors.newFixedThreadPool(2);//define Event FactoryHelloEventFactory factory = new HelloEventFactory();//ringbuffer byte sizeint bufferSize = 1024 * 256;disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());//set consumer eventdisruptor.handleEventsWith(new HelloEventHandler());//start disruptor threaddisruptor.start();//gain ringbuffer ring,to product eventRingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}//@PreDestroypublic void shutdownDisruptor() {if (disruptor != null) {System.out.println("close Disruptor...");disruptor.shutdown(); //cl0se Disruptor}}
}

shudown方法源码

public void shutdown(long timeout, TimeUnit timeUnit) throws TimeoutException {long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);do {if (!this.hasBacklog()) {this.halt();return;}} while(timeout < 0L || System.currentTimeMillis() <= timeOutAt);throw TimeoutException.INSTANCE;
}

这里会等到所有内存消息全部处理完

private boolean hasBacklog() {long cursor = this.ringBuffer.getCursor();Sequence[] var3 = this.consumerRepository.getLastSequenceInChain(false);int var4 = var3.length;for(int var5 = 0; var5 < var4; ++var5) {Sequence consumer = var3[var5];if (cursor > consumer.get()) {return true;}}return false;
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(Disruptor)

3.测试

  • 启动Spring Boot应用
  • post 请求http://127.0.0.1:8088/actuator/shutdown实现优雅停止
  • 访问http://127.0.0.1:8088/hello,会报503错误
  • 后台会等待Disruptor处理内存消息
  • 后台等待处理其他的异步任务
  • 最后关闭Spring Boot应用

4.引用

  • https://medium.com/@contact.technovisionconsulting/how-to-achieve-a-graceful-shutdown-in-spring-boot-ec93d55916ed
  • Spring Boot关闭时,如何确保内存里面的mq消息被消费完? | Harries Blog™
http://www.lryc.cn/news/478039.html

相关文章:

  • HTML 基础标签——文本内容标签 <ul>、<ol>、<blockquote> 、<code> 等标签的用法详解
  • 高效管理社团:Spring Boot在校园社团信息管理中的应用
  • mysql约束和高级sql
  • 蓝桥杯真题——三角回文数(C语言)
  • uni-app 封装图表功能
  • Kubernetes的基本构建块和最小可调度单元pod-0
  • QT创建按钮篇
  • 初级软件测试工程师就别出口喊15K了,连自动化测试都不会,还不如应届生
  • Mybatis查询数据库,返回List集合,集合元素也是List。
  • SQL 视图:概念、应用与最佳实践
  • ubuntu交叉编译expat库给arm平台使用
  • 成都郝蓉宜恺文化传媒有限公司以诚信经营赢得客户长期信赖
  • LabVIEW for Linux 介绍
  • 一次32bit有符号数据类型转换为64bit无符号数据类型引发的溢出错误
  • aosp安卓15新特性dump的wms窗口层级树优化的更加美观
  • git的使用、router和route的区别以及v-show和v-if的差别
  • Win系统通过命令行查看笔记本电池损耗/寿命/健康
  • 【安当产品应用案例100集】029-使用安全芯片保护设备核心业务逻辑
  • Redis高级篇之缓存一致性详细教程
  • C++ 文件操作详解
  • 计算机网络:网络层 —— 边界网关协议 BGP
  • Python数据可视化seaborn
  • Java继承练习
  • Excel怎么转换成word?分享两种方法!
  • nignx代理获取真实地址request.getRequestURL()
  • 登录注册窗口(二)
  • go channel 通道
  • 论文阅读:Computational Long Exposure Mobile Photography (二)
  • 基于SSM+小程序的高校寻物平台管理系统(失物1)
  • gerrit 搭建遇到的问题