当前位置: 首页 > news >正文

java多线程BlockingDeque的三种线程安全正确退出方法

本文介绍两种BlockingDeque在多线程任务处理时正确结束的方法

在这里插入图片描述

一般最开始简单的多线程处理任务过程

  • 把总任务放入BlockingDeque
  • 创建多个线程,每个线程内逻辑时,判断BlockingDeque任务是否处理完,处理完退出,还有任务就BlockingDeque.take()取任务处理
  • 主线程join等待多线程处理完,收尾处理完成任务。

最开始版本代码,10个任务,3个线程来处理

package org.example;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;public class Main3 {public static void main(String[] args){System.out.println("start");BlockingDeque<Integer> task = new LinkedBlockingDeque<>();for (int i = 0; i < 10; i++) {task.add(i);}List<Thread> workers = new ArrayList<>();for (int i = 0; i < 3; i++) {Thread worker = new Thread(()->{while (true) {Integer data = null;try {if (task.size()==0) {System.out.println(Thread.currentThread().getName() +" quit");break;}
//                        Thread.sleep(100); // 默认任务耗时data = task.take();} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() +" do "+ data);}});workers.add(worker);worker.start();}for (Thread worker: workers) {try {worker.join();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("job done");}
}

运行之后,感觉非常好,完美实现逻辑

但是当把上面的任务数加到200,线程数加到30,上面线程sleep的注释打开,再次运行,就会发现主进程最后会被一直卡着不结束,说明多线程没有正确判断任务结束,线程不安全

上面的子线程内的size()等于0到下面的BlockingDeque.take()取任务这段之间的代码,这段不是线程安全的

让线程正确判断任务结束,而且要线程安全的三种方法,推荐第二种,兼顾效率和兼容正确性

  • 判断任务结束这段代码加synchronized约束起来,实现线程安全(太慢)
  • 给总任务task内,加入和线程相同数量的停止标志marker
  • 使用BlockingDeque.poll(超时时间) + 异常数据检查(需要检查异常数据)

使用synchronized约束

package org.example;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;public class Main {public static void main(String[] args) {System.out.println("start");BlockingDeque<Integer> task = new LinkedBlockingDeque<>();for (int i = 0; i < 20; i++) {task.add(i);}List<Thread> workers = new ArrayList<>();for (int i = 0; i < 3; i++) {Thread worker = new Thread(()->{while (true) {Integer data = null;synchronized (task) {if (task.size() ==0) {System.out.println(Thread.currentThread().getName() +" quit");break;}try {data = task.take();} catch (InterruptedException e) {throw new RuntimeException(e);}}try {Thread.sleep(300);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() +" do "+ data);}});workers.add(worker);worker.start();}for (Thread worker: workers) {try {worker.join();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("job done");}
}

总任务添加stop marker停止标志

package org.example;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;public class Main2 {public static void main(String[] args){System.out.println("start");BlockingDeque<Integer> task = new LinkedBlockingDeque<>();for (int i = 0; i < 20; i++) {task.add(i);}List<Thread> workers = new ArrayList<>();for (int i = 0; i < 3; i++) task.add(99);for (int i = 0; i < 3; i++) {Thread worker = new Thread(()->{while (true) {Integer data = null;try {data = task.take();if (data == 99) {System.out.println(Thread.currentThread().getName() +" quit");break;}} catch (InterruptedException e) {throw new RuntimeException(e);}try {Thread.sleep(300);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() +" do "+ data);}});workers.add(worker);worker.start();}for (Thread worker: workers) {try {worker.join();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("job done");}
}

使用BlockingDeque.poll(超时时间),避免了take的永久性等待问题,但是会取到null值,要加判断处理

package org.example;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;public class Main4 {public static void main(String[] args){System.out.println("start");BlockingDeque<Integer> task = new LinkedBlockingDeque<>();for (int i = 0; i < 200; i++) {task.add(i);}List<Thread> workers = new ArrayList<>();for (int i = 0; i < 30; i++) {Thread worker = new Thread(()->{while (true) {Integer data = null;try {if (task.size()==0) {System.out.println(Thread.currentThread().getName() +" quit");break;}Thread.sleep(100); // 默认任务耗时data = task.poll(1000, TimeUnit.MILLISECONDS);if (data == null) {System.out.println(Thread.currentThread().getName() +" get null");continue;}} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() +" do "+ data);}});workers.add(worker);worker.start();}for (Thread worker: workers) {try {worker.join();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println("job done");}
}

在这里插入图片描述

http://www.lryc.cn/news/62856.html

相关文章:

  • 从STM32F407到AT32F407(一)
  • 【数据结构】顺序表和链表基本实现(含全代码)
  • CMake : Linux 搭建开发 - g++、gdb
  • 大数据实战 --- 美团外卖平台数据分析
  • 三大本土化战略支点,大陆集团扩大中国市场生态合作「朋友圈」
  • 为什么停更ROS2机器人课程-2023-
  • 【SpringCloud常见面试题】
  • ChatGPT+智能家居在AWE引热议 OpenCPU成家电产业智能化降本提速引擎
  • 拷贝构造函数和运算符重载
  • 本周热门chatGPT之AutoGPT-AgentGPT,可以实现完全自主实现任务,附部署使用教程
  • Mysql 优化LEFT JOIN语句
  • 全栈成长-python学习笔记之数据类型
  • 面试|兴盛优选数据分析岗
  • Redis(08)主从复制master-slave replication
  • 被chatGPT割了一块钱韭菜
  • vue3+ts+pinia+vite一次性全搞懂
  • Apache安装与基本配置
  • 哈夫曼树【北邮机试】
  • thinkphp:数值(保留小数点后N位,四舍五入,左侧补零,格式化货币,取整,生成随机数,数字与字母进行转换)
  • 用Flutter你得了解的七个问题
  • Nmap使用手册
  • 基于ResNet-attention的负荷预测
  • 华为校招机试 - 批量初始化次数(20230426)
  • WhatsApp CRM:通过 CRM WhatsApp 集成向客户发送消息
  • SOLIDWORKS Electrical无缝集成电气和机械设计
  • Numpy从入门到精通——数组变形|合并数组
  • DJ4-5 路由算法:LS 和 DV
  • python图像处理之形态学梯度、礼帽、黑帽
  • 千万级直播系统后端架构设计
  • ImageJ 用户手册——第五部分(菜单命令File,Edit)