当前位置: 首页 > news >正文

springboot 多线程实战

先说下业务场景,业务1:基于实时轨迹数据打卡,业务2:基于非实时轨迹的时间差,计算累计时长。 简单点说就是从websocket获取到的实时数据,既要兼容不耗时操作,又要兼容耗时操作。

单线程做的话,一两个用户的数据没问题,用户多了就处理不过来。

实现思路是用TaskExecutor来做,一个task接收从redis lPop的数据,并放入BlockingQueue,另外的task从BlockingQueue获取数据。

@Autowiredprivate TaskExecutor taskExecutor1;@Autowiredprivate TaskExecutor taskExecutor2;@Autowiredprivate TaskExecutor taskExecutor3;static BlockingQueue<TrackHistory> dataQueue = new ArrayBlockingQueue<>(1 << 12);static BlockingQueue<TrackHistory> keepWatchQueue = new ArrayBlockingQueue<>(1 << 12);public static final String M = ":";@Bean("redisReadThread")public String service() {taskExecutor1.execute(() -> {while (true) {try {lPop();} catch (Exception e) {e.printStackTrace();}}});return null;}@Bean("calculationsBusinessData")public void calculationsService() {taskExecutor2.execute(() -> {while (true) {try {if (dataQueue.size() != 0) {TrackHistory trackRealTime = dataQueue.poll();if (trackRealTime == null) {Thread.sleep(100L);} else {//耗时方法doSomething();//存储当前日期+人员的最新位置坐标saveTrackToRedis(trackRealTime);}} else {Thread.sleep(100L);}} catch (Exception e) {log.error("业务1数据计算异常->{}", e.getMessage());}}});}@Bean("calculationsKeepWatch")public void keepWatchService() {taskExecutor3.execute(() -> {while (true) {try {if (keepWatchQueue.size() != 0) {TrackHistory trackRealTime = keepWatchQueue.poll();if (trackRealTime == null) {Thread.sleep(100L);} else {doSomething2(trackRealTime);}} else {Thread.sleep(100L);}} catch (Exception e) {log.error("业务2数据计算异常->{}", e.getMessage());}}});}/*** 从队列中读取数据** @return*/private synchronized void lPop() {Object o = redisTemplate.opsForList().leftPop(RedisKeyCons.COORDINATE);if (!org.springframework.util.StringUtils.isEmpty(o)) {TrackHistory trackRealTime = (TrackHistory) o;log.info("leftPop trackHistory = {}", trackRealTime);if (null != trackRealTime) {if (checkMemberExist(trackRealTime)) {return;}//存储当前日期+人员的最新位置坐标saveTrackToRedisForKeepWatch(trackRealTime);dataQueue.add(trackRealTime);keepWatchQueue.add(trackRealTime);}}}

配置线程池

/*** 线程池配置、启用异步***/
@EnableAsync
@Configuration
public class AsycTaskExecutorConfig {@Bean(name="taskExecutor1")public TaskExecutor taskExecutor1() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(1);taskExecutor.setMaxPoolSize(1);return taskExecutor;}@Bean(name="taskExecutor2")public TaskExecutor taskExecutor2() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();//最大线程数taskExecutor.setMaxPoolSize(5);//核心线程数taskExecutor.setCorePoolSize(5);//任务队列的大小taskExecutor.setQueueCapacity(5);//线程前缀名
//        executor.setThreadNamePrefix();//线程存活时间taskExecutor.setKeepAliveSeconds(60);taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//线程初始化return taskExecutor;}@Bean(name="taskExecutor3")public TaskExecutor taskExecutor3() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();//最大线程数taskExecutor.setMaxPoolSize(5);//核心线程数taskExecutor.setCorePoolSize(5);//任务队列的大小taskExecutor.setQueueCapacity(5);//线程前缀名
//        executor.setThreadNamePrefix();//线程存活时间taskExecutor.setKeepAliveSeconds(60);taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//线程初始化return taskExecutor;}@Bean(name = "asyncPoolTaskExecutor")public ThreadPoolTaskExecutor executor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();//核心线程数taskExecutor.setCorePoolSize(10);//线程池维护线程的最大数量,只有在缓冲队列满了之后才会申请超过核心线程数的线程taskExecutor.setMaxPoolSize(10);//缓存队列taskExecutor.setQueueCapacity(15);//设置线程的空闲时间,当超过了核心线程出之外的线程在空闲时间到达之后会被销毁taskExecutor.setKeepAliveSeconds(60);//异步方法内部线程名称taskExecutor.setThreadNamePrefix("async-");/*** 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略* 通常有以下四种策略:* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。* ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功*/taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());taskExecutor.initialize();return taskExecutor;}}

http://www.lryc.cn/news/144890.html

相关文章:

  • 求生之路2社区服务器sourcemod安装配置搭建教程centos
  • 通达OAV12版本,表单及流程,定制开发总结
  • 浅析Linux 物理内存外碎片化
  • C#中的get和set
  • mysql8.0以上忘记密码的重置方法 - window系统
  • 手写Vue3响应式数据原理
  • 基于PIC单片机篮球计分计时器
  • 关于Maxwell与Kafka和数据库的监控
  • 【设计模式】Java设计模式详细讲解
  • 【MySQL】表的增删查改(进阶)
  • Vim几种跳转方式
  • element-ui 弹窗里面嵌套弹窗,解决第二个弹窗被遮罩层掩盖无法显示的问题
  • 【业务功能篇76】微服务网关路由predicates断言条件-filters路由转换地址-跨域问题-多级目录树化层级设计-mybatisPlus逻辑删除
  • apache的ab工具测试网页优化效果速度以及服务器承载
  • 【进阶篇】MySQL 存储引擎详解
  • Spring集成【MyBatis】和【PageHelper分页插件】整合---详细介绍
  • PyCharm下安装配置PySide6开发环境(Qt Designer(打开,编辑)、PyUIC和PyRCC)
  • pytest fixture 创建一个 requests.session() 对象
  • 深入分析负载均衡情景
  • WPF基础入门-Class5-WPF命令
  • 云安全攻防(十三)之 使用minikube安装搭建 K8s 集群
  • Python数据分析 | 各种图表对比总结
  • linux系统(centos、ubuntu、银河麒麟服务、uos、deepin)判断程序是否已安装,通用判断方法:适用所有应用和命令的判断
  • Python3多线程/多进程解决方案(持续更新ing...)
  • 在`CentOS`中安装`Docker Engine`
  • [ VMware 虚拟机 ] 启动不了图形界面,报 “The system is running in low-graphics mode” 错误
  • 如何提高视频清晰度?视频调整清晰度操作方法
  • IO进程线程,文件与目录,实现linux任意目录下ls -la
  • R语言如果列表中有列表,且每个子列表有一个向量:如何转变为仅仅一个列表里面含有向量
  • nrm管理源仓库及发布私人npm包