深入理解 Linux 多线程
文章目录
- 一、Linux 线程概念
- 1. 什么是线程
- 2. 线程的优点
- 3. 线程的缺点
- 4. 线程异常
- 5. 线程用途
- 二、进程 VS 线程
- 1. 进程和线程
- 三、Linux 线程控制
- 1. POSIX 线程库
- 2. 创建线程
- 3. 线程终止
- 4. 线程等待
- 方法 1️⃣:通过 return 返回值退出
- 方法 2️⃣:通过 pthread_exit 返回值退出
- 方法 3️⃣:通过 pthread_cancel 返回值退出
- 总结
- 5. 线程 ID 及进程地址空间布局
- 用户级线程库创建线程的流程
- 线程调度与运行
- 关于栈空间
- 线程 ID = TCB 地址
- 为什么叫 用户级线程 ?
- 总结
- 四、分离线程
- 1. 方法一
- 2. 方法二
- 3. 总结
- 五、Linux 线程互斥
- 1. 背景概念
- 2. 互斥量 mutex
- 3. 互斥量的接口
- 初始化互斥量
- 销毁互斥量
- 互斥量加锁和解锁
- 4. 改进代码
- 静态方法改进
- 动态方法改进
- 5. 互斥量实现原理探究
- RAII 风格的锁
- 6. 可重入 VS 线程安全
- 概念
- 常见的线程不安全的情况
- 常见的线程安全的情况
- 常见不可重入的情况
- 常见可重入的情况
- 可重入与线程安全联系
- 可重入与线程安全区别
- 六、常见锁概念
- 1. 死锁
- 2. 死锁四个必要条件
- 3. 避免死锁
- 4. 避免死锁算法
- 七、Linux 线程同步
- 1. 条件变量
- 2. 同步概念与竞态条件
- 3. 条件变量函数
- 代码实例
- 4. 为什么 pthread_cond_wait 需要互斥量?
- 5. 条件变量使用规范
- 八、生产者消费者模型
- 1. 基本概念
- 模型核心概念
- 场景类比
- 运行流程
- 总结
- 2. 321原则
- 3. 为何要使用生产者消费者模型
- 4. 生产者消费者模型优点
- 九、基于 BlockingQueue 的生产者消费者模型
- 十、POSIX 信号量
- 1. 基于环形队列的生产消费模型
- 2. 代码实现
- 3. 改进代码(添加任务)
- 十一、线程池
- 十二、线程安全的单例模式
- 1. 什么是单例模式
- 2. 什么是设计模式
- 3. 单例模式的特点
- 4. 饿汉实现方式和懒汉实现方式
- 5. 饿汉方式实现单例模式
- 6. 懒汉方式实现单例模式
- 7. 懒汉方式实现单例模式(线程安全版本)
- 十三、STL,智能指针和线程安全
- 十四、其他常见的各种锁
- 十五、读者写者问题
- 1. 读写锁
- 2. 读写锁接口
- 3. 读写锁案例
0️⃣1️⃣2️⃣3️⃣4️⃣5️⃣6️⃣7️⃣8️⃣
一、Linux 线程概念
1. 什么是线程
在一个程序里的一个执行路线就叫做线程(thread)。
更准确的定义是:线程是 “一个进程内部的控制序列”。
- 一切进程至少都有一个执行线程。
- 线程在进程内部运行,本质是在进程地址空间内运行。
- 在 Linux 系统中,在 CPU 眼中,看到的 PCB 都要比传统的进程更加轻量化。
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流。
如下图所示:
我们首先需要达成一个共识:一个进程的资源可以通过虚拟空间与页表,将部分资源分配给特定的线程。
什么是进程?
从内核角度看,进程是承担系统资源分配的基本实体,它由以下部分组成:多个PCB(进程控制块)、一个虚拟内存、多个页表,以及加载到物理内存中的代码和数据。
在 Linux 中,线程是 CPU 调度的基本单位。
如何理解我们之前学习的进程概念?它与今天所讲的内容是否冲突?
以前学习的进程同样是承担系统资源的基本实体,只不过其内部只有一个执行流;而今天我们所讲的进程,内部可以包含多个执行流。
也就是说,线程在内核层面上,只需在原有进程的框架下创建 PCB,并为其分配该进程的部分资源即可。而 CPU 的调度是 “无差别” 的,它只关注可以调度哪些task_struct
(任务结构体),调度的差异仅在于任务量的轻重。
因此,从 CPU 的角度来看:历史概念中的 “进程”,相当于今天概念中 “进程内的一个分支”。
我们现在给 CPU 的task_struct
,其含义范围小于或等于历史上的task_struct
。也就是说,现在我们给 CPU 的所有task_struct
都是轻量级进程。
理解多线程的一个关键,是明白线程和进程的关系:
家庭:进程(Process)
家庭成员:线程(Thread)
一个 进程 就像一个家庭,是一个资源的 “单位”,比如有自己的房子(内存空间)、厨具(文件句柄)、银行账户(系统资源)等。
一个 线程 就是这个家庭中的成员,他们共享同一个房子(同一块内存)、一起用厨具、用同一个账户,但是每个人可以做不同的事情,比如妈妈做饭,爸爸修水管,孩子写作业。
多线程的意义:
如果家庭成员只有一个(单线程),所有事情只能排队来,比如先做饭,再洗衣服,再打扫卫生,效率低下。
但如果有多个成员(多线程),可以同时并行干活,效率提升:
- 妈妈在厨房做饭(线程1)
- 爸爸在客厅修东西(线程2)
- 孩子在书房写作业(线程3)
他们互相独立,但也有可能抢资源(比如都想用洗衣机),就需要“协调”(线程同步机制)。
2. 线程的优点
1️⃣ 创建一个新线程的代价要比创建一个新进程小得多。
- 创建进程需要完成一系列工作:创建 PCB(进程控制块)、建立地址空间、构建页表、加载代码和数据、建立映射关系,必要时还要打开文件、处理信号等。这些操作都需要逐一执行。
- 而创建线程时,只需创建 PCB,然后直接共享所属进程的资源。因此,相比之下,创建线程的成本要低得多。
2️⃣ 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多。
- 进程之间要切换:PCB、页表、虚拟地址空间、上下文。
- 线程之间要切换:PCB、上下文。
- CPU 内部除了寄存器,还集成了一个 cache,它相当于 CPU 内部的硬件缓存。线程切换 cache 不用太更新,而进程切换需要全部更新。
- 注意:cache 当中经常被当前进程那么访问的,并且以较高的概率命中的这种数据叫热点数据。
3️⃣ 线程占用的资源要比进程少很多。
4️⃣ 能充分利用多处理器的可并行数量。
5️⃣ 在等待慢速 I/O 操作结束的同时,程序可执行其他的计算任务。
6️⃣ 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现。
7️⃣ I/O 密集型应用,为了提高性能,将 I/O 操作重叠。线程可以同时等待不同的 I/O 操作。
3. 线程的缺点
1️⃣ 性能损失
一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。
如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
2️⃣ 健壮性降低
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
3️⃣ 缺乏访问控制
进程是访问控制的基本粒度,在一个线程中调用某些 OS 函数会对整个进程造成影响。
4️⃣ 编程难度提高
编写与调试一个多线程程序比单线程程序困难得多。
4. 线程异常
- 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃。
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出。
5. 线程用途
- 合理的使用多线程,能提高 CPU 密集型程序的执行效率。
- 合理的使用多线程,能提高 IO 密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)。
二、进程 VS 线程
1. 进程和线程
进程是资源分配的基本单位,线程是调度的基本单位。
线程共享进程数据,但也拥有自己的一部分数据:
- 线程 ID
- 一组寄存器
- 栈
- errno
- 信号屏蔽字
- 调度优先级
进程的多个线程共享同一地址空间,因此 Text Segment
、Data Segment
都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到。
除此之外,各线程还共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(
SIG_ IGN
、SIG_ DFL
或者自定义的信号处理函数) - 当前工作目录
- 用户 ID 和 组 ID
进程和线程的关系如下图:
如何看待之前学习的单进程?很简单,其实是具有一个线程执行流的进程。
1️⃣ Linux 内核中是没有真正意义的线程的,它是用进程 PCB 来模拟线程的,是一种完全属于自己的一套线程方案。
2️⃣ 站在 CPU 的视角,每一个 PCB 都可以称之为轻量级进程。
3️⃣ Linux 线程是 CPU 调度的基本单位,而进程是承担分配资源的基本单位。
4️⃣ 进程用来整体申请资源,而线程用来伸手向进程要资源。
5️⃣ Linux 中没有真正意义的线程,所以真正意义就是:有对应的线程控制块,并且和进程要产生对应的关系。
我们为什么要创建线程呢?
主要原因是未来我们可能会面临着那么一个进程内可能要并行的去执行各种不同的任务,比如:边播放,边下载。
OS 只认线程,程序员也只认线程。Liunx 无法直接提供创建线程的系统调用接口,而只能给我们提供创建轻量级进程的接口。
6️⃣ 用进程PCB来模拟线程的好处是:简单,且维护成本大大降低,可靠高效!
总结:一个进程不能叫做一个 PCB,并且这也不是进程,一个 PCB 最多叫一个执行流,而我们进程叫做:PCB + 地址空间 + 页表 + 为进程创建的一大堆的数据结构(包括加载到内存当中的代码数据)统称为进程。
三、Linux 线程控制
1. POSIX 线程库
与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以 pthread_
打头的。
而要使用这些函数库,要通过引入头文件:<pthread.h>
。
链接这些线程函数库时要使用编译器命令的 -lpthread
选项。
2. 创建线程
功能:创建一个新的线程。
函数原型:
#include <pthread.h>int pthread_create(pthread_t *thread, // 线程 ID(输出)const pthread_attr_t *attr, // 线程属性(可为 NULL 表示默认)void *(*start_routine) (void *), // 线程函数(入口点)void *arg // 传入线程函数的参数
);
参数名 | 含义 |
---|---|
pthread_t *thread | 创建成功后,该指针指向的新线程 ID |
const pthread_attr_t *attr | 线程属性,设为 NULL 使用默认属性 |
void *(*start_routine)(void *) | 线程执行的函数,线程启动后从这里开始运行 |
void *arg | 传递给 start_routine 的参数,可以是任意类型的指针 |
错误检查:
- 传统的一些函数是,成功返回 0,失败返回 -1,并且对全局变量 errno 赋值以指示错误。
- pthreads 函数出错时不会设置全局变量 errno(而大部分其他 POSIX 函数会这样做)。而是将错误代码通过返回值返回。
- pthreads 同样也提供了线程内的 errno 变量,以支持其它使用 errno 的代码。对于 pthreads 函数的错误,建议通过返回值判定,因为读取返回值要比读取线程内的 errno 变量的开销更小。
编写 Makefile
myproc:test.cppg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f myproc
主函数代码
#include <iostream>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>using namespace std;// 新线程执行函数
void* thread_routine(void *args)
{const char* name = (const char*)args;while (true){cout << "I'am new thread, my name is " << name << endl;sleep(1);}
}int main()
{pthread_t tid;// 创建线程int n = pthread_create(&tid, nullptr, thread_routine, (void*)"thread one");assert(0 == n);(void)n;// 主线程while (true){cout << "I'am main thread" << endl;sleep(1);}return 0;
}
运行结果:
另外,可以看到 Linux 下依赖的是原生线程库:
可以看到有两个线程,它们的 PID 都是一样的,但是 LWP 不一样,LWP 是 light weight process 轻量级进程 ID。
其中 LWP 为 31694的是主线程,LWP 为 31695的为新线程。
所以 CPU 调度的时候,是以 LWP 为标识符表示特定一个执行流的。
而当你只有一个执行流的时候,PID 和 LWP 是等价的(即单进程)。
另外,线程一旦被创建,几乎所有的资源都是被所有线程共享的(凡是在地址空间内的,基本都是共享的)
修改代码:
#include <iostream>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>using namespace std;// 该方法被主线程和新线程共享
string func()
{return "I am an independent method";
}// 新线程执行函数
void* thread_routine(void *args)
{const char* name = (const char*)args;while (true){cout << "I'am new thread, my name is " << name << " --> " << func() << endl;sleep(1);}
}int main()
{pthread_t tid;// 创建线程int n = pthread_create(&tid, nullptr, thread_routine, (void*)"thread one");assert(0 == n);(void)n;sleep(1);// 主线程while (true){cout << "I'am main thread" << " --> " << func() << endl;sleep(1);}return 0;
}
运行结果:
线程也一定有自己私有的资源,那么什么资源应该是线程私有的呢?
- PCB 属性私有。
- 要有私有的上下文结构。
- 每一个线程都要有自己独立的栈结构。
一个线程如果出现了异常,会影响其他线程吗?为什么?
代码示例
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>using namespace std;// 新线程执行函数
void* thread_routine(void *args)
{string name = static_cast<const char*>(args); // 安全的进行强制类型转换while (true){cout << "new thread create success! my name is " << name << endl;sleep(1);// 写一个错误代码int* p = nullptr;*p = 0;}
}int main()
{pthread_t tid;// 创建线程int n = pthread_create(&tid, nullptr, thread_routine, (void*)"thread one");assert(0 == n);(void)n;// 主线程while (true){cout << "I'am main thread" << endl;sleep(1);}return 0;
}
可以看到新线程出异常了,主线程没有受到影响,故多线程程序一个线程出异常,会直接影响其他线程的正常运行。说明鲁棒性或者健壮性较差
那么为什么呢?
当你的新线程出现异常时,操作系统会立即识别到该线程发生了硬件错误,例如地址转换失败,或者你试图对没有写入权限的内存空间执行写入操作。
此时,操作系统会通过内存管理单元(MMU)和页表直接触发异常,并迅速定位到是哪个线程或进程导致了这个硬件错误。
上面是以信号的角度来解释的,而今天我想换一个角度来阐述:
当一个线程出现异常并导致崩溃时,操作系统会回收该进程的资源。一旦进程资源被回收,所有线程赖以生存的基础就不复存在,因此所有线程都会随之退出。
3. 线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数 return。这种方法对主线程不适用,从 main 函数 return 相当于调用 exit。
- 线程可以调用
pthread_exit
终止自己。 - 一个线程可以调用
pthread_ cancel
终止同一进程中的另一个线程。
1️⃣ pthread_exit
函数
功能:线程终止
void pthread_exit(void *value_ptr);
参数:
- value_ptr:
value_ptr
不要指向一个局部变量。 - 返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
代码实现
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>using namespace std;// 新线程执行函数
void* thread_routine(void *args)
{string name = static_cast<const char*>(args); // 安全的进行强制类型转换while (true){cout << "new thread create success! my name is " << name << endl;sleep(1);pthread_exit(nullptr);}
}int main()
{pthread_t tid;// 创建线程int n = pthread_create(&tid, nullptr, thread_routine, (void*)"thread one");assert(0 == n);(void)n;// 主线程while (true){cout << "I'am main thread" << endl;sleep(1);}return 0;
}
可以看到,此时新线程已经退出了,而主线程还在继续运行:
需要注意,pthread_exit
或者 return
返回的指针所指向的内存单元必须是全局的或者是用 malloc 分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
2️⃣ pthread_cancel
函数
功能:取消一个执行中的线程
int pthread_cancel(pthread_t thread);
参数:
- thread:线程 ID
- 返回值:成功返回 0,失败返回错误码。
代码示例
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>using namespace std;// 新线程执行函数
void* thread_routine(void *args)
{string name = static_cast<const char*>(args); // 安全的进行强制类型转换while (true){cout << "new thread create success! my name is " << name << endl;sleep(1);}return nullptr;
}int main()
{pthread_t tid;// 创建线程int n = pthread_create(&tid, nullptr, thread_routine, (void*)"thread one");assert(0 == n);(void)n;sleep(1);// 主线程延时3秒后请求取消子线程sleep(3);int m = pthread_cancel(tid); // 请求取消子线程assert(m == 0);cout << "pthread_cancel : " << tid << " success" << endl;return 0;
}
运行结果:
pthread_cancel(tid)
发出取消请求。子线程处于 sleep(1)
时是取消点,因此能立即响应取消请求并退出。
4. 线程等待
为什么需要线程等待?如果不等待,会造成类似僵尸进程的问题:内存泄露!
- 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
- 创建新的线程不会复用刚才退出线程的地址空间。
pthread_join
函数
功能:等待线程结束
int pthread_join(pthread_t thread, void **value_ptr);
参数:
- thread:线程 ID
- value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回 0,失败返回错误码。
方法 1️⃣:通过 return 返回值退出
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;void *thread1(void *arg)
{printf("thread 1 returning ... \n");int *p = (int *)malloc(sizeof(int));*p = 1;return (void *)p;
}int main()
{pthread_t tid;void *ret = nullptr;// thread 1 returnpthread_create(&tid, NULL, thread1, NULL);pthread_join(tid, &ret);printf("thread return, thread id %X, return code: %d\n", tid, *(int *)ret);free(ret);return 0;
}
特点:
- 线程退出时 return 一个值(返回堆上分配的地址)。
pthread_join()
接收到 ret 指针。
最终在主线程中通过 *(int *)ret
打印返回值并 free()
释放内存。
运行结果:
方法 2️⃣:通过 pthread_exit 返回值退出
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;void *thread2(void *arg)
{printf("thread 2 exiting ...\n");int *p = (int *)malloc(sizeof(int));*p = 2;pthread_exit((void *)p); //与 return 效果等价,只是使用 pthread_exit() 明确表达线程退出。
}int main()
{pthread_t tid;void *ret = nullptr;// thread 2 exitpthread_create(&tid, NULL, thread2, NULL);pthread_join(tid, &ret);printf("thread return, thread id %X, return code: %d\n", tid, *(int *)ret);free(ret);return 0;
}
特点:
- 与 return 效果等价,只是使用
pthread_exit()
明确表达线程退出。 - 同样可被
pthread_join()
捕获到返回值。
运行结果:
方法 3️⃣:通过 pthread_cancel 返回值退出
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;void *thread3(void *arg)
{while (1){ printf("thread 3 is running ...\n");sleep(1);}return NULL; // 实际不会走到这里
}int main()
{pthread_t tid;void *ret = nullptr;// thread 3 cancel by otherpthread_create(&tid, NULL, thread3, NULL);sleep(3);pthread_cancel(tid);pthread_join(tid, &ret);if (ret == PTHREAD_CANCELED) {printf("thread return, thread id %X, return code: PTHREAD_CANCELED\n", tid);}else {printf("thread return, thread id %X, return code: NULL\n", tid);}return 0;
}
特点:
- 新线程进入死循环,模拟长期运行。
- 主线程调用
pthread_cancel()
请求取消新线程。 - 若新线程在取消点(如
sleep()
)会响应取消并退出。 pthread_join()
返回的ret == PTHREAD_CANCELED
表示被取消。
PTHREAD_CANCELED
是 POSIX 线程库(pthread)中用于表示线程被取消退出的特殊宏值。也就是说,如果一个线程被 pthread_cancel()
请求取消,并且线程在取消点处响应了取消请求,那么:
pthread_join()
返回的void *ret
就会是PTHREAD_CANCELED
(即(void *)-1
)。
运行结果:
总结
pthread_join(pthread_t thread, void **value_ptr)
等待线程结束
调用 pthread_join
函数的线程将挂起等待,直到 tid 为 thread 的线程终止。thread 线程以不同的方法终止,通过 pthread_join
得到的终止状态是不同的,总结如下:
- 如果 thread 线程通过 return 返回,
value_ ptr
所指向的单元里存放的是 thread 线程函数的返回值。 - 如果 thread 线程被别的线程调用
pthread_ cancel
异常终掉,value_ ptr 所指向的单元里存放的是常数PTHREAD_ CANCELED
。 - 如果 thread 线程是自己调用
pthread_exit
终止的,value_ptr 所指向的单元存放的是传给pthread_exit
的参
数。 - 如果对 thread 线程的终止状态不感兴趣,可以传 NULL 给 value_ ptr 参数。
如下图所示:
方式 | 语法 | 可通过 pthread_join 获取返回值 | 典型用途 |
---|---|---|---|
return | return (void*)ptr; | ✅ | 简洁,适合普通返回 |
pthread_exit | pthread_exit((void*)ptr); | ✅ | 显式退出,可搭配清理函数等 |
pthread_cancel | pthread_cancel(tid); | ✅(返回值为 PTHREAD_CANCELED ) | 外部强制终止线程 |
思考一下:为什么没有见到线程退出的时候,对应的退出信号呢?
因为线程退出异常收到信号,整个进程都会退出!换言之,你主线程都退出了,还要退出码有什么意义呢?
故,pthread_join
默认就认为函数会调用成功,不会考虑异常问题,进程才会考虑异常问题!
观点 | 结论 |
---|---|
线程没有退出信号 | ✅ 正确,线程退出是静默行为 |
线程异常会导致整个进程崩溃 | ✅ 正确,信号是进程级别的 |
主线程退出,线程退出码无意义 | ✅ 正确,线程返回值是线程间交互用,不影响进程退出码 |
pthread_join 默认成功,不处理异常 | ✅ 正确,因为线程崩溃就 join 不到了,进程已终止 |
5. 线程 ID 及进程地址空间布局
思考一下:线程打印出来的 tid 的值,究竟是什么呢?
-
pthread_ create
函数会产生一个线程 ID,存放在第一个参数指向的地址中。该线程 ID 和前面说的线程 ID 不是一回事。 -
前面讲的线程 ID 属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
-
pthread_ create
函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程 ID,属于 NPTL 线程库的范畴。线程库的后续操作,就是根据该线程 ID 来操作线程的。
线程库 NPTL 提供了 pthread_ self
函数,可以获得线程自身的 ID:
pthread_t pthread_self(void);
代码示例
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>using namespace std;// 获取线程的tid, 并按十六进制打印
string changeId(const pthread_t &thread_id)
{char tid[128]; snprintf(tid, sizeof(tid), "0x%x", thread_id);return tid;
}// pthread_self()能获取线程ID
void* start_routine(void* args)
{sleep(1);string name = static_cast<char*>(args);while (true){cout << name << changeId(pthread_self()) <<endl;sleep(1);}return nullptr;
}int main()
{// 创建线程pthread_t tid;pthread_create(&tid, nullptr, start_routine, (void*)"new thread 1 running... ");string main_id = changeId(pthread_self()); // 主线程的IDpthread_detach(tid); // 创建线程以后, 立马分离while (true){cout << "main thread running, the id is "<< main_id << "...new thread id is " << changeId(tid) << endl;sleep(1);}return 0;
}
运行结果如下:
pthread_t
到底是什么类型呢?取决于实现。对于 Linux 目前实现的 NPTL 实现而言,pthread_t
类型的线程 ID,本质就是一个进程地址空间上的一个地址。
如下图所示:用户级线程库通过底层 clone
系统调用实现线程创建的全过程
用户级线程库创建线程的流程
1️⃣ 第一步:线程控制块(TCB)分配
当你调用线程创建接口(如 pthread_create()
,或你自己写的库函数),用户线程库会在共享内存区中,分配一个 线程控制块(TCB),这个 TCB 包含:
- 线程 ID(通常是 TCB 的地址)
- 栈空间地址(
child_stack
) - TLS(Thread Local Storage)
- 调用回调函数指针
- 线程状态、优先级、调度参数等
2️⃣ 第二步:调用 clone()
创建轻量级进程
函数原型:
int clone(int (*fn)(void *), void *child_stack,int flags, void *arg,pid_t *ptid, struct user_desc *tls, pid_t *ctid);
用户线程库会调用 clone()
,传入:
fn
:线程执行的起始函数child_stack
:TCB 中分配好的栈地址flags
:如CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD
arg
:传递给fn
的参数,一般是 TCB 的地址
这一步是真正通过 Linux 的内核接口 创建一个 轻量级进程 —— 也就是 Linux 所说的 线程。
线程调度与运行
一旦 clone()
成功返回,新线程开始运行,就会执行 fn(arg)
,这里 arg
就是我们传入的 TCB。
系统会使用 child_stack
所指向的栈空间来维护线程执行过程中的寄存器上下文、返回地址、局部变量等。
因此:所有线程的运行时栈都是在各自 TCB 中维护的私有区域中!
关于栈空间
这些私有栈空间由用户线程库管理,甚至可以做成线程池模式来复用。
线程类型 | 使用的栈位置说明 |
---|---|
主线程 | 使用的是进程原始地址空间的用户栈 |
子线程 | 使用的是分配给其 TCB 的私有栈空间 |
线程 ID = TCB 地址
在用户线程库中:
- 每创建一个线程,库会返回其 线程 ID;
- 实际上是返回的 TCB 地址;
- 因为库中以 TCB 为核心结构,记录了所有线程状态及资源;
- 在调度、终止、同步线程时,都是通过这个 ID 也就是 TCB 地址 来操作。
为什么叫 用户级线程 ?
- 线程调度逻辑、栈分配、同步原语(如锁)都由用户线程库控制;
- 系统只负责最底层的
clone
创建; - 用户线程库可以实现协程、用户态抢占、线程绑定等机制;
- 如果操作系统不感知线程存在,称为 “用户态线程”;
- 如果操作系统协助管理线程(如 NPTL),则是“内核线程”。
总结
关键概念 | 描述 |
---|---|
TCB | 线程控制块,线程的核心元数据 |
child_stack | clone() 使用的私有栈地址 |
clone() | 创建 Linux 内核线程的系统调用 |
用户级线程库 | 管理 TCB、调度、栈分配、线程同步等 |
线程 ID = TCB地址 | 在用户级库中,用 TCB 地址标识线程 |
四、分离线程
默认情况下,新创建的线程是 joinable 的,线程退出后,需要对其进行 pthread_join
操作,否则无法释放资源,从而造成系统泄漏。
如果不关心线程的返回值,join 是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
语法 | 含义 |
---|---|
pthread_detach(thread); | 主线程/其他线程分离某个线程 |
pthread_detach(pthread_self()); | 当前线程自我分离 |
1. 方法一
将某个线程设置为 detached 状态,释放它的 “可连接性”,防止资源泄漏。
int pthread_detach(pthread_t thread);
参数:线程 ID(pthread_t)
返回值:
- 0:成功
- 非 0:错误码,例如:
- EINVAL:线程已经是 detached 状态
- ESRCH:线程不存在
代码示例
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;void* thread_func(void* arg)
{printf("Child thread started (TID: %lu)\n", pthread_self());sleep(2);printf("Child thread finished\n");return nullptr;
}int main()
{pthread_t tid;// 创建线程if (pthread_create(&tid, NULL, thread_func, NULL) != 0) {perror("Failed to create thread");exit(1);}// 分离子线程(主线程不再关心它的返回值)if (pthread_detach(tid) != 0) {perror("Failed to detach thread");exit(1);}printf("Main thread: detached child thread, no join needed.\n");// 等待足够时间让子线程完成sleep(3);printf("Main thread exiting.\n");return 0;
}
运行结果:
2. 方法二
让当前线程变为 detached 状态
pthread_detach(pthread_self());
这在以下情况很常见:
- 在线程函数内部调用,表示线程自我分离。
- 表示无需主线程回收我(不用
pthread_join
),我会在结束时自动清理资源。 - 常用于线程池、守护线程、一次性线程等 “甩手不管” 的情景。
代码示例
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <pthread.h>
#include <unistd.h>
using namespace std;void *thread_func(void *arg)
{// 自我分离pthread_detach(pthread_self());printf("Detached thread running...\n");sleep(2);printf("Detached thread exiting...\n");return nullptr;
}int main()
{pthread_t tid;int n = pthread_create(&tid, nullptr, thread_func, nullptr);assert(0 == n);(void)n;// 不用 pthread_join(tid),也不会内存泄漏sleep(3); // 等待子线程退出printf("Main thread exiting.\n");return 0;
}
运行结果:
注意事项:
特性 | 说明 |
---|---|
一旦分离,不能再 join | 否则会报错 EINVAL |
不会导致资源泄漏 | detached 的线程退出时会自动释放其占用的资源 |
不建议主线程随便 detach 子线程 | 不知道子线程状态时,最好还是 join |
常用于线程自己主动调用 | pthread_detach(pthread_self()); 让主线程不用操心资源清理 |
3. 总结
joinable 和分离是冲突的,一个线程不能既是 joinable 又是分离的。
我们创建一个线程让它自我 detach,然后尝试在 main 线程中 pthread_join()
,用于验证 detach 之后再 join 会失败。
代码示例
#include <iostream>
#include <string>
#include <cassert>
#include <cstdio>
#include <cstring>
#include <pthread.h>
#include <unistd.h>
using namespace std;void *thread_func(void *args)
{string name = static_cast<const char*>(args); // 安全的进行强制类型转换// 自我分离pthread_detach(pthread_self());printf("Detached %s...\n", name);return nullptr;
}int main()
{pthread_t tid;int n = pthread_create(&tid, nullptr, thread_func, (void*)"thread 1 run...");assert(0 == n);(void)n;// 让子线程在主线程 join() 前有时间自我 detach,否则竞争条件下可能主线程先 join(),就不会失败。sleep(1); //很重要,要让线程先分离,再等待if (pthread_join(tid, nullptr) == 0) //如果此时主线程调用 pthread_join(tid),会失败,返回 EINVAL。{printf("pthread wait success\n");}else{printf("pthread wait failed, errno = %d (%s)\n", errno, strerror(errno));}return 0;
}
运行结果:pthread_join()
返回错误,因为你试图 join 一个已经 detach 的线程,这是违反 POSIX 规范的,返回值为 EINVAL。
但是更推荐是用法是,在线程创建以后直接分离:
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>using namespace std;// 获取线程的tid, 并按十六进制打印
string changeId(const pthread_t &thread_id)
{char tid[128]; snprintf(tid, sizeof(tid), "0x%x", thread_id);return tid;
}// pthread_self()能获取线程ID
void* start_routine(void* args)
{sleep(1);string name = static_cast<char*>(args);int cnt = 5;while (cnt--){cout << name << changeId(pthread_self()) <<endl;sleep(1);}return nullptr;
}int main()
{// 创建线程pthread_t tid;pthread_create(&tid, nullptr, start_routine, (void*)"new thread 1 running... ");string main_id = changeId(pthread_self()); // 主线程的IDpthread_detach(tid); // 创建线程以后, 立马分离while (true){cout << "main thread running, the id is "<< main_id << "...new thread id is " << changeId(tid) << endl;sleep(1);}return 0;
}
运行结果:
详细总结:
行为 | 结果 |
---|---|
子线程 pthread_detach() | 成为分离线程 |
主线程调用 pthread_join | 返回失败 EINVAL |
sleep(1) | 防止主线程先 join |
errno 值 | 可判断错误类型 |
五、Linux 线程互斥
1. 背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源。
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
2. 互斥量 mutex
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
多个线程并发的操作共享变量,会带来一些问题。
我们以售票系统代码为例:
// 操作共享变量会有问题的售票系统代码
#include <iostream>
#include <functional>
#include <string>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>
using namespace std;// 共享资源 --- 火车票
int tickets = 1000;void *getTicket(void* args)
{string username = static_cast<char*>(args);while (true){if (tickets > 0){// 休眠的这段的时间模拟真实的抢票花费的时间usleep(1254); // 1s = 1000ms = 1000 000ws// 开始抢cout << username << " 正在进行抢票: " << tickets << endl;tickets--;}else{// 没票的话直接退出break;}}return nullptr;
}int main()
{pthread_t t1, t2, t3, t4;pthread_create(&t1, nullptr, getTicket, (void*)"user 1");pthread_create(&t2, nullptr, getTicket, (void*)"user 2");pthread_create(&t3, nullptr, getTicket, (void*)"user 3");pthread_create(&t4, nullptr, getTicket, (void*)"user 4");pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_join(t4, nullptr);return 0;
}
执行结果出现了负数:
为什么可能无法获得正确结果呢?
if
语句判断条件为真以后,代码可以并发的切换到其他线程。usleep
是在模拟漫长业务的过程,而在这个漫长的业务过程中,可能有很多个线程会进入该代码段。--ticket
操作本身就不是一个原子操作。
我们先生成汇编代码:
然后单独取出 ticket--
部分的汇编代码:
代码如下:
400cd8: 8b 05 de 13 20 00 mov 0x2013de(%rip),%eax # 6020bc <tickets>
400cde: 83 e8 01 sub $0x1,%eax
400ce1: 89 05 d5 13 20 00 mov %eax,0x2013d5(%rip) # 6020bc <tickets>
--
操作并不是原子操作,而是对应三条汇编指令:
load
:将共享变量 tickets 从内存加载到寄存器中。update
:更新寄存器里面的值,执行-1
操作。store
:将新值,从寄存器写回共享变量 ticket 的内存地址。
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux 上提供的这把锁叫互斥量。如下图所示:
3. 互斥量的接口
初始化互斥量
初始化互斥量有两种方法:
1️⃣ 静态分配
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER
2️⃣ 动态分配
#include <pthread.h>int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);参数:
- mutex: 要初始化的互斥量
- attr: NULL
销毁互斥量
销毁互斥量需要注意:
- 使用静态方法
PTHREAD_ MUTEX_ INITIALIZER
初始化的互斥量不需要销毁。 - 不要销毁一个已经加锁的互斥量。
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。
#include <pthread.h>int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
函数原型
#include <pthread.h>int pthread_mutex_lock(pthread_mutex_t *mutex); // 加锁
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 解锁
调用 pthread_ lock
时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么
pthread_ lock
调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
4. 改进代码
我们基于上面两种方式,来改进刚刚写的售票系统代码。
静态方法改进
代码实现
#include <iostream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>using namespace std;
#include <memory>// 定义一个全局锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // 共享资源 --- 火车票
int tickets = 1000;void *getTicket(void* args)
{//sleep(1);string username = static_cast<char*>(args);while (true){pthread_mutex_lock(&lock); // 对临界区进行加锁if (tickets > 0){// 休眠的这段的时间模拟真实的抢票花费的时间usleep(1254); // 1s = 1000ms = 1000 000ws// 开始抢cout << username << " 正在进行抢票: " << tickets << endl;tickets--;pthread_mutex_unlock(&lock); // 对临界区进行解锁}else{// 没票的话直接退出pthread_mutex_unlock(&lock); // 对临界区进行加解锁break;}// 抢完票以后, 有一个形成订单的过程usleep(1000);}return nullptr;
}int main()
{pthread_t t1, t2, t3, t4;pthread_create(&t1, nullptr, getTicket, (void*)"thread 1");pthread_create(&t2, nullptr, getTicket, (void*)"thread 2");pthread_create(&t3, nullptr, getTicket, (void*)"thread 3");pthread_create(&t4, nullptr, getTicket, (void*)"thread 4");pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_join(t4, nullptr);return 0;
}
执行结果:可以看到此时就不会出现获取到负数的情况了
动态方法改进
我们对上面的代码进行改动一下,用 ThreadData 结构体来进行线程参数的封装,这样,每个线程启动时都会拿到自己独立的 ThreadData 实例,里面放着它需要的上下文信息。
代码实现
#include <iostream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>using namespace std;class ThreadData
{
public:ThreadData(const string &threadname, pthread_mutex_t *mutex_p) : _threadname(threadname), _mutex_p(mutex_p){}~ThreadData() {}public:string _threadname; // 标识线程的名字(方便日志打印区分是谁在抢票)。pthread_mutex_t *_mutex_p; // 共享互斥锁的指针(保证多个线程对 tickets 的访问互斥)。
};// 共享资源 --- 火车票
int tickets = 1000;void *getTicket(void* args)
{//sleep(1);ThreadData *td = static_cast<ThreadData*>(args);while (true){// 加锁和解锁的过程是多个线程串行执行的, 所以程序变慢了// 锁只规定互斥访问, 没有规定必须让谁优先执行// 锁就是真正的让多个执行流进行竞争的结果pthread_mutex_lock(td->_mutex_p); // 对临界区进行加锁if (tickets > 0){// 休眠的这段的时间模拟真实的抢票花费的时间usleep(1254); // 1s = 1000ms = 1000 000ws// 开始抢cout << td->_threadname << " 正在进行抢票: " << tickets << endl;tickets--;pthread_mutex_unlock(td->_mutex_p); // 对临界区进行解锁}else{// 没票的话直接退出pthread_mutex_unlock(td->_mutex_p); // 对临界区进行加解锁break;}// 抢完票以后, 有一个形成订单的过程usleep(1000);}return nullptr;
}int main()
{
#define NUM 4pthread_mutex_t lock;pthread_mutex_init(&lock, nullptr); // 初始化锁vector<pthread_t> tids(NUM);// 创建线程for (int i = 0; i < NUM; ++ i){char buffer[64];snprintf(buffer, sizeof(buffer), "thread %d", i+1);ThreadData* td = new ThreadData(buffer, &lock);pthread_create(&tids[i], nullptr, getTicket, td);}// 等待线程for (const auto &tid : tids){pthread_join(tid, nullptr);}pthread_mutex_destroy(&lock); // 销毁锁return 0;
}
执行结果:
5. 互斥量实现原理探究
经过上面的例子,相信大家已经意识到单纯的 i++
或者 ++i
都不是原子的,有可能会有数据一致性问题。
如何理解呢?
举个例子:假设现在有线程 A 和线程 B,以及 1000 张票。
线程A:
- 第一步:先从内存中读取数据,即把 1000 加载到 CPU 的寄存器中
- 第二步:再做 tickets--,此时在 CPU 的 ebx 寄存器中就是:1000 -> 999
- 第三步:把 999 返回到内存中tickets变量的位置。但是,很不幸,当线程 A 做完第一步和第二步的时候,线程 A已经被切走了。
寄存器这个硬件只有一份儿,但是寄存器里的内容不属于当前操作系统或者 CPU 的,寄存器里的数据叫做当前线程的上下文,是属于当前线程 A 的。
所以线程 A 除了要把自己切走之外,他还要把自己上下文带走(1000 -> 999)。此时,OS 开始调度线程B:
- 第一步:先从内存中读取数据,即把 1000 加载到 CPU 的寄存器中
- 第二步:再做 tickets--,此时在 CPU 的 ebx 寄存器中就是:1000 -> 999
- 第三步:把 999 返回到内存中tickets变量的位置。至此,线程 B 完成了三步动作,并且没人来抢占他,所以他一直循环了 800 次,此时内存中 tickets 变量的值为200。
当线程 B 想继续的时候,OS 把它切换走了,寄存器硬件只有一份儿,但是寄存器里的数据是属于当前线程 B 的,所以线程 B 它走的时候也要带着自己的上下文走。此时 OS 又把线程 A 给切换回来了,那么线程 A 要在 CPU 的寄存器里面恢复自己的上下文,然后要继续在曾经没有做完的工作基础之上,继续执行第三步:把 999 返回到内存中 tickets 变量的位置。但是别忘了,内存中 tickets 变量的值已经被线程 B 减到 200 了,结果线程 A 又给变成了 999,此时就相当于多线程之间运算的时候发生了对应的干扰问题!
用稍微正规的语言解释为:
为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令。
该指令的作用是把 寄存器 和 内存单元 的数据相交换。由于只有一条指令,保证了原子性。
即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
现在我们把 lock
和 unlock
的伪代码改一下,如下所示:
思考一下:
1️⃣ 如何看待锁呢?
- 锁本身就是一个共享资源!而全局的变量是要被保护的,锁是用来保护全局资源的,锁本身也是全局资源,锁的安全谁来保护呢?
- 所以必须保证
pthread_mutex_lock
加锁的过程是安全的!加锁的过程其实是原子的! - 如果申请成功,就继续向后执行;如果申请暂时不成功,执行流会阻塞。
- 谁持有锁,谁就进入临界区。
2️⃣ 如何理解加锁和解锁的本质呢?
- 加锁的过程是原子的(要么加,要么不加,即加锁以后,未来解锁一定是只有一个执行流的)。
3️⃣ 如果我们想简单的使用,该如何进行封装设计呢?
- 做一个设计,封装一把锁。
RAII 风格的锁
我们最终怎么来理解这个互斥锁?
实际上呢?只需要把互斥锁当成变量,然后在汇编级别给我们提供 CPU 寄存器和内存之间数据交换的方式,我们就能完成互斥的功能。
所以,基于此,我们可以封装一把锁,然后在售票系统代码中使用这把锁即可:
Mutex.hpp
#pragma once// 封装一把锁#include <iostream>
#include <pthread.h>class Mutex
{
public:// 构造函数Mutex(pthread_mutex_t *lock_p = nullptr) : _lock_p(lock_p){}// 加锁void lock(){if (_lock_p) // 如果锁不为空, 那么就加锁{pthread_mutex_lock(_lock_p);}}// 解锁void unlock(){if (_lock_p) // 如果锁不为空, 那么就解锁{pthread_mutex_unlock(_lock_p);}}// 析构函数~Mutex(){}private:pthread_mutex_t *_lock_p;
};//
class LockGuard
{
public:LockGuard(pthread_mutex_t* mutex) :_mutex(mutex){_mutex.lock(); // 在构造函数中进行加锁}~LockGuard(){_mutex.unlock(); // 在析构函数中进行解锁}
private:Mutex _mutex;
};
Test.cpp
#include <iostream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <pthread.h>
#include <unistd.h>
#include <memory>#include "Mutex.hpp"using namespace std;// 定义一个全局锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // 共享资源 --- 火车票
int tickets = 1000;void *getTicket(void* args)
{//sleep(1);string username = static_cast<char*>(args);while (true){// 这个的 {} 是不想给 usleep(1000) 加锁{// RAII风格的加锁LockGuard lockguard(&lock); // 一条语句完成了加锁和解锁操作if (tickets > 0){// 休眠的这段的时间模拟真实的抢票花费的时间usleep(1254); // 1s = 1000ms = 1000 000ws// 开始抢cout << username << " 正在进行抢票: " << tickets << endl;tickets--;}else{// 没票的话直接退出break;}}// 抢完票以后, 有一个形成订单的过程usleep(1000);}return nullptr;
}int main()
{pthread_t t1, t2, t3, t4;pthread_create(&t1, nullptr, getTicket, (void*)"thread 1");pthread_create(&t2, nullptr, getTicket, (void*)"thread 2");pthread_create(&t3, nullptr, getTicket, (void*)"thread 3");pthread_create(&t4, nullptr, getTicket, (void*)"thread 4");pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_join(t4, nullptr);return 0;
}
执行结果:
6. 可重入 VS 线程安全
概念
- 线程安全:多个线程并发执行同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。
- 一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见的线程不安全的情况
- 不保护共享变量的函数。
- 函数状态随着被调用,状态发生变化的函数。
- 返回指向静态变量指针的函数。
- 调用线程不安全函数的函数。
常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
- 类或者接口对于线程来说都是原子操作。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。
常见不可重入的情况
- 调用了 malloc/free 函数,因为 malloc 函数是用全局链表来管理堆的。
- 调用了标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构。
- 可重入函数体内使用了静态的数据结构。
常见可重入的情况
- 不使用全局变量或静态变量。
- 不使用 malloc 或者 new 开辟出的空间。
- 不调用不可重入函数。
- 不返回静态或全局数据,所有数据都有函数的调用者提供。
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
可重入与线程安全联系
- 函数是可重入的,那就是线程安全的。
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别
- 可重入函数是线程安全函数的一种。
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
六、常见锁概念
1. 死锁
死锁是指:在一组进程中的各个进程均占有不会释放的资源,但因互相申请 被其他进程所占用不会释放的资源 而处于的一种永久等待状态。
在多把锁的场景下,我们持有自己的锁不释放,还要对方的锁,并且对方也是如此(持有自己的锁不释放,还要我的锁),此时就容易造成死锁!
2. 死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用。
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放。
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺。
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。
3. 避免死锁
- 破坏死锁的四个必要条件。
- 加锁顺序一致。
- 避免锁未释放的场景。
- 资源一次性分配。
4. 避免死锁算法
- 死锁检测算法(了解)
- 银行家算法(了解)
七、Linux 线程同步
1. 条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
2. 同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
3. 条件变量函数
1️⃣ 初始化
#include <pthread.h>int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
2️⃣ 销毁
#include <pthread.h>int pthread_cond_destroy(pthread_cond_t *cond);
3️⃣ 等待条件满足
#include <pthread.h>int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
4️⃣ 唤醒等待
#include <pthread.h>int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
代码实例
用 条件变量 (pthread_cond_t
) 来控制多个线程 抢票 的流程,主线程不断通过 pthread_cond_broadcast
唤醒所有等待的线程,每个工作线程被唤醒后会打印当前票数,并把 tickets--
。
代码实现
#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>using namespace std;int tickets = 100;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 全局的互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 全局的条件变量void* start_routine(void* args)
{string name = static_cast<const char*>(args);while (true){pthread_mutex_lock(&mutex); // 加锁pthread_cond_wait(&cond, &mutex); // 所有的线程都在cond条件变量下等待cout << name << " -> " << tickets << endl;tickets--;pthread_mutex_unlock(&mutex); // 解锁}
}int main()
{// 通过条件变量来控制线程的执行pthread_t t[5];for (int i = 0; i < 5; ++i){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t+i, nullptr, start_routine, name);}// 主线程来唤醒在该条件变量cond下等待的线程while (true){sleep(1);pthread_cond_signal(&cond); // 唤醒单个线程pthread_cond_broadcast(&cond); // 唤醒一批线程cout << "main thread wakeup one thread..." << endl;}for (int i = 0; i < 5; ++i){pthread_join(t[i], nullptr);}return 0;
}
执行结果:
同时我们也可以在 main 函数中修改代码,唤醒一批线程:
// 主线程来唤醒在该条件变量cond下等待的线程while (true){sleep(1);pthread_cond_broadcast(&cond); // 唤醒一批线程cout << "main thread wakeup one thread..." << endl;}
执行结果:
4. 为什么 pthread_cond_wait 需要互斥量?
条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护,没有互斥锁就无法安全的获取和修改共享数据。
如图所示:
按照上面的说法,我们设计出如下的代码:
先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了。
代码如下:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false)
{ pthread_mutex_unlock(&mutex); //解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过 pthread_cond_wait(&cond); pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
由于解锁和等待不是原子操作。调用解锁之后,pthread_cond_wait
之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait
将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait
。
所以解锁和等待必须是一个原子操作。
所以 int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex)
进入该函数后,会去看条件量等于 0 不?等于的话,就把互斥量变成 1,直到 cond_ wait
返回,把条件量改成 1,把互斥量恢复成原样。
5. 条件变量使用规范
1️⃣ 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
{pthread_cond_wait(cond, mutex);
}
// 修改条件
pthread_mutex_unlock(&mutex);
2️⃣ 给条件发送信号代码
pthread_mutex_lock(&mutex);
// 设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
八、生产者消费者模型
一个线程频繁的申请锁资源,而导致那么对方长时间得不到资源,那么此时这就叫做饥饿问题!
线程运行的同步的本质是:当我们在进行临界资源访问安全的前提条件下,让多个线程按照一定的顺序进行资源访问,这就叫做线程同步。
1. 基本概念
我们可以用 超市 来形象化理解 生产者 - 消费者模型。
模型核心概念
-
生产者(Producer):负责生产商品,并把商品放到货架(共享资源)。
-
消费者(Consumer):负责从货架上取商品,然后结账离开。
-
缓冲区(Buffer):这里相当于超市的货架,用来存放商品,容量有限。
同步与互斥:
- 货架空了 → 消费者必须等待(防止拿不到东西)。
- 货架满了 → 生产者必须等待(防止堆不下)。
- 同一时刻不能有两个人同时改动货架上的同一个商品(防止数据冲突)。
场景类比
角色/概念 | 超市类比 | 编程模型 |
---|---|---|
生产者 | 供货商、配送员 | 生产线程/任务 |
消费者 | 顾客 | 消费线程/任务 |
缓冲区 | 货架 | 队列(Queue)、缓存区(Buffer) |
同步机制 | 店员指挥补货顺序、顾客排队取货 | 信号量(Semaphore)、条件变量(Condition) |
互斥机制 | 货架一次只能被一个人整理 | 互斥锁(Mutex)、临界区(Critical Section) |
运行流程
生产者补货:
- 供货员(生产者)看货架有空,就补货上去。
- 如果货架满了,就等顾客买走一些再补。
消费者购买:
- 顾客(消费者)进来挑选商品。
- 如果货架空了,就等补货员把货补上。
同步与互斥:
- 同步:补货员和顾客通过 货架状态 来协调 —— 货架空/满就是信号。
- 互斥:同一时刻,不能两个补货员同时往同一个位置放货,也不能一个补货员补货时顾客从同一位置拿货。
总结
- 货架 = 缓冲区
- 供货员 = 生产者
- 顾客 = 消费者
- 排队和等货 = 同步
- 一次只能一个人整理货架 = 互斥
这模型的目标就是 —— 生产和消费互不冲突、效率最大化。
2. 321原则
3 种关系:
- 生产者和生产者(互斥关系)
- 消费者和消费者(互斥关系)
- 生产者和消费者(互斥(保证共享资源的安全性)&&同步)
2 种角色:
- 生产者线程、消费者线程
1 个交易场所:
- 一段特定结构的缓冲区
所以:只要我们想写生成消费模型,我们本质工作其实就是维护 321 原则!
3. 为何要使用生产者消费者模型
生产者消费者模式就是通过一个 容器 来解决 生产者 和 消费者 的 强耦合 问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
4. 生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
如图所示:
九、基于 BlockingQueue 的生产者消费者模型
文章链接:生产者消费者模型
十、POSIX 信号量
POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步。
1️⃣ 初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); 参数:
- pshared: 0 表示线程间共享,非零表示进程间共享
- value: 信号量初始值
2️⃣ 销毁信号量
#include <semaphore.h>
int sem_destroy(sem_t *sem);
3️⃣ 等待信号量
功能: 等待信号量,会将信号量的值减 1 #include <semaphore.h>
int sem_wait(sem_t *sem); //P()
4️⃣ 发布信号量
功能: 发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1。 #include <semaphore.h>
int sem_post(sem_t *sem);//V()
上面关于 生产者 - 消费者 的例子是基于 queue 的,其空间可以动态分配。
那么我们现在可以基于固定大小的 环形队列 重写上面的程序,并使用 POSIX 信号量。
1. 基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性。如图所示:
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
如图所示:
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
2. 代码实现
RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <cassert>
#include <semaphore.h>using namespace std;static const int gcap = 10;template<class T>
class RingQueue
{
private:void P(sem_t &sem){int n = sem_wait(&sem); // 等待信号量, 会将信号量的值减1assert(0 == n);(void)n;}void V(sem_t &sem){int n = sem_post(&sem); // 发布信号量, 表示资源使用完毕, 可以归还资源了, 将信号量值加1。assert(0 == n);(void)n;}public:RingQueue(const int cap = gcap) : _queue(cap), _cap(cap){// 初始化信号量int n = sem_init(&_spaceSem, 0, _cap); // 生产者的信号值设置为_capassert(0 == n);n = sem_init(&_dataSem, 0, 0); // 消费者的信号值设置为0assert(0 == n);(void)n;// 刚开始的时候, 生产者和消费者的下标都是0_productStep = _consumeStep = 0;}// 生产者接口: 向环形队列中投入数据void Push(const T& in){// 1. 先申请信号量(申请到了空间信号量, 意味着我一定能进行正常的生产)P(_spaceSem); // 2. 往下标放入数据_queue[_productStep] = in; _productStep++;// 3. 防止下标越界, 保证是环形的递增_productStep %= _cap;// 4. 把数据资源给消费者V(_dataSem);}// 消费者接口: 从环形队列中拿出数据void Pop(T* out){// 1. 先申请信号量(申请到了空间信号量, 意味着我一定能进行正常的消费)P(_dataSem);// 2. 开始消费(拿走数据)*out = _queue[_consumeStep];_consumeStep++;// 3. 防止下标越界, 保证是环形的递增_consumeStep %= _cap;// 4. 拿走数据以后, 该下标所指向的位置空出来了V(_spaceSem);}~RingQueue(){// 销毁信号量sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}
private:vector<T> _queue; // 用数组来模拟环形队列int _cap; // 队列容量sem_t _spaceSem; // 生产者想生产, 在队列中看中的是空间资源(也就是有没有空的位置)sem_t _dataSem; // 消费者想消费, 在队列中看中的是数据资源(也就是存放有没有数据)// 生产和消费的位置: 其实就是队列中的下标int _productStep;int _consumeStep;
};
Main.cpp
#include "RingQueue.hpp"
#include <pthread.h>
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>// 单生产和单消费// 生产者
void* Producter(void* _rq)
{RingQueue<int>* rq = static_cast< RingQueue<int>* >(_rq);while (true){ // 1. 先生产int data = rand() % 10 + 1;rq->Push(data);cout << "生产完成, 生产的数据是: " << data << endl;}return nullptr;
}// 消费者
void* Consumer(void* _rq)
{sleep(1);RingQueue<int>* rq = static_cast< RingQueue<int>* >(_rq);while (true){// 1. 先消费int data;rq->Pop(&data);cout << "消费完成, 消费的数据是: " << data << endl;sleep(1);}return nullptr;
}int main()
{ srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x98794357); // 种一颗随机数种子RingQueue<int>* rq = new RingQueue<int>();pthread_t p, c;pthread_create(&p, nullptr, Producter, rq);pthread_create(&c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete rq;return 0;
}
执行结果:当程序运行起来以后,红色部分的结果是一次性打印出来的,刚好是环形队列的容量
3. 改进代码(添加任务)
我们这里还是来创建一个任务,来模拟更加完整的过程。
Task.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <cstdio>using namespace std;// 构建一个任务, 放入阻塞队列中
class Task
{using func_t = function<int(int, int, char)>;
public:Task() {}Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func){}string operator()(){int ret = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, ret);return buffer;}string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}
private:int _x;int _y;char _op;func_t _callback; // 回调方法
};const string oper = "+-*/%";
int mymath(int x, int y, char op)
{int ret = 0;switch (op){case '+':ret = x + y;break;case '-':ret = x - y;break;case '*':ret = x * y;break;case '/':{if (y == 0){cerr << "div zero error!" << endl;ret = -1;}else{ret = x / y;}}break;case '%':{if (y == 0){cerr << "mod zero error!" << endl;ret = -1;}else{ret = x % y;}}break;default:// do nothingbreak;}return ret;
}
Main.cpp
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>// 单生产和单消费派发任务的过程// 生产者
void* Producter(void* _rq)
{RingQueue<Task>* rq = static_cast< RingQueue<Task>* >(_rq);while (true){ // 构建任务int x = rand() % 10;int y = rand() % 5;char op = oper[rand() % oper.size()];Task t(x, y, op, mymath);// 生产任务rq->Push(t);// 输出提示cout << "生产者派发任务: " << t.toTaskString() << endl;sleep(1);}return nullptr;
}// 消费者
void* Consumer(void* _rq)
{sleep(1);RingQueue<Task>* rq = static_cast< RingQueue<Task>* >(_rq);while (true){// 构建任务对象Task t;// 消费任务rq->Pop(&t);string result = t();cout << "消费者消费任务: " << result << endl;sleep(1);}return nullptr;
}int main()
{ srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x98794357); // 种一颗随机数种子RingQueue<Task>* rq = new RingQueue<Task>();pthread_t p, c;pthread_create(&p, nullptr, Producter, rq);pthread_create(&c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete rq;return 0;
}
执行结果:
十一、线程池
文章链接:手撕线程池
十二、线程安全的单例模式
1. 什么是单例模式
单例模式是一种 “经典的、常用的、常考的” 设计模式。
2. 什么是设计模式
IT 行业这么火,涌入的人很多。俗话说林子大了啥鸟都有,故大佬和菜鸡们两极分化的越来越严重。
为了让菜鸡们不太拖大佬的后腿,于是大佬们针对一些经典的常见的场景,给定了一些对应的解决方案,这个方案就是 设计模式。
3. 单例模式的特点
某些类,只应该具有一个对象(实例),就称之为单例。例如,一个孩子只能有一个父亲。
在很多服务器开发场景中,经常需要让服务器加载很多的数据(上百 G)到内存中,此时往往要用一个单例的类来管理这些数据即可。
4. 饿汉实现方式和懒汉实现方式
- 饿汉方式:吃完饭,立刻洗碗,因为下一顿吃的时候可以立刻拿着碗就能吃饭。
- 懒汉方式:吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗碗。
懒汉方式最核心的思想是 “延时加载”,从而能够优化服务器的启动速度。
5. 饿汉方式实现单例模式
代码示例
template <typename T>
class Singleton
{static T data;
public:static T *GetInstance(){return &data;}
};
只要通过 Singleton 这个包装类来使用 T 对象,则一个进程中只有一个 T 对象的实例。
6. 懒汉方式实现单例模式
代码示例
template <typename T>
class Singleton
{static T *inst;
public:static T *GetInstance(){if (inst == NULL){inst = new T();}return inst;}
};
上面这种方式存在一个严重的问题:线程不安全。
第一次调用 GetInstance 的时候,如果两个线程同时调用,可能会创建出两份 T 对象的实例,但是后续再次调用,就没有问题了。
7. 懒汉方式实现单例模式(线程安全版本)
代码示例
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.static std::mutex lock;
public:static T *GetInstance(){if (inst == NULL){lock.lock();if (inst == NULL){inst = new T();}lock.unlock();}return inst;}
};
注意事项:
- 加锁解锁的位置。
- 双重 if 判定, 避免不必要的锁竞争。
- volatile 关键字防止过度优化。
十三、STL,智能指针和线程安全
思考一下:STL 中的容器是否是线程安全的?
不是。原因是,STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。
而且对于不同的容器,加锁方式的不同,性能可能也不同(例如 hash 表的锁表和锁桶)。
因此 STL 默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。
智能指针是否是线程安全的?
对于
unique_ptr
,由于只是在当前代码块范围内生效,因此不涉及线程安全问题。
对于shared_ptr
,多个对象需要共用一个引用计数变量,所以会存在线程安全问题。但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证shared_ptr
能够高效,原子的操作引用计数。
十四、其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS 操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
十五、读者写者问题
1. 读写锁
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少,相比较改写,它们读的机会反而高的多。
通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
如下所示:
注意:写独占,读共享,读锁优先级高。
2. 读写锁接口
1️⃣ 设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref); pref 共有 3 种选择:
- PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
- PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
- PTHREAD_RWLOCK_PREFER_READER_NP 一致
- PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
2️⃣ 初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
3️⃣ 销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
4️⃣ 加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
3. 读写锁案例
利用读写锁实现多线程环境下的 “卖票” 与 “读票” 模拟:
- 读线程(大量)不停地读取剩余票数。
- 写线程(少量)不停地卖票(递减票数)。
- 使用 pthread 读写锁 保证数据一致性。
代码实现
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>// 1. 全局变量与读写锁
volatile int ticket = 1000; // ticket 表示剩余票数,初始值为 1000。
pthread_rwlock_t rwlock; // 使用 pthread_rwlock_t 读写锁保护对 ticket 的并发访问,避免数据竞争。// 2. 读线程 reader
/*
- 持续读取票数(只读操作),用 读锁 保证并发读取安全。
- 当票数耗尽后退出。
- 因为是读锁,多个读者线程可以并发访问。
*/
void *reader(void *arg)
{char *id = (char *)arg;while (1){pthread_rwlock_rdlock(&rwlock);if (ticket <= 0){pthread_rwlock_unlock(&rwlock);break;}printf("%s: %d\n", id, ticket);pthread_rwlock_unlock(&rwlock);usleep(1);}return nullptr;
}// 3. 写线程 writer
/*
- 持续减少票数(写操作),用 写锁 保证只有一个写者能修改票数。
- 票卖光后退出循环。
*/
void *writer(void *arg)
{char *id = (char *)arg;while (1){pthread_rwlock_wrlock(&rwlock);if (ticket <= 0){pthread_rwlock_unlock(&rwlock);break;}printf("%s: %d\n", id, --ticket);pthread_rwlock_unlock(&rwlock);usleep(1);}return nullptr;
}// 4. 辅助结构与线程初始化
/*
ThreadAttr 保存线程 id 和线程名。
*/
struct ThreadAttr
{pthread_t tid;std::string id;
};// create_reader_id / create_writer_id 用 ostringstream 拼接线程名字。
std::string create_reader_id(std::size_t i)
{// 利用 ostringstream 进行 string 拼接std::ostringstream oss("thread reader ", std::ios_base::ate);oss << i;return oss.str();
}
std::string create_writer_id(std::size_t i)
{// 利用 ostringstream 进行 string 拼接std::ostringstream oss("thread writer ", std::ios_base::ate);oss << i;return oss.str();
}// init_readers / init_writers 批量创建读写线程。
void init_readers(std::vector<ThreadAttr> &vec)
{for (std::size_t i = 0; i < vec.size(); ++i){vec[i].id = create_reader_id(i);pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());}
}
void init_writers(std::vector<ThreadAttr> &vec)
{for (std::size_t i = 0; i < vec.size(); ++i){vec[i].id = create_writer_id(i);pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());}
}// join_threads 回收所有线程(逆序回收)。
void join_threads(std::vector<ThreadAttr> const &vec)
{// 我们按创建的 逆序 来进行线程的回收for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=vec.rend();++it){pthread_t const &tid = it->tid;pthread_join(tid, nullptr);}
}// 5. 读写锁初始化
/*
- 可以选择 写优先 或 读优先 策略。
- 默认是读优先,可能导致写线程长时间等待(写饥饿)。
*/
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else// 读优先,会造成写饥饿pthread_rwlock_init(&rwlock, nullptr);
#endif
}// 6. 主函数
/*
- 创建 1000 个读线程 + 2 个写线程。
- 写线程负责“卖票”,读线程负责“读票”。
- 主线程等待所有线程结束后退出。
*/
int main()
{// 测试效果不明显的情况下,可以加大 reader_nr// 但也不能太大,超过一定阈值后系统就调度不了主线程了const std::size_t reader_nr = 1000;const std::size_t writer_nr = 2;std::vector<ThreadAttr> readers(reader_nr);std::vector<ThreadAttr> writers(writer_nr);init_rwlock();init_readers(readers);init_writers(writers);join_threads(writers);join_threads(readers);pthread_rwlock_destroy(&rwlock);return 0;
}
执行结果: