学习笔记:无锁队列的原理以及c++实现
本期在讲解无锁队列前先补充一下上一期的一个问题
自旋锁和互斥锁区别
等到策略:自旋锁是在用户态忙等待,互斥锁在内核态休眠
上下文切换:自旋锁无上下文切换,互斥锁有用户态内核态的切换
场景:自旋锁适用持锁时间短,互斥锁用于持锁时间长
回到本期无锁队列的内容中
什么是无锁队列?
简单来说就是与阻塞队列相对的,不用锁而是用原子操作和内存屏障实现线程安全
为什么要无锁队列?
多线程环境下锁的开销较大,同时时间都浪费在保护队列争夺上面而不是执行任务,同时有些情况不能使用锁,所以就有开发无锁队列的需求
锁的局限:
线程阻塞带来的切换
死锁风险
性能瓶颈,高并发情况下锁竞争激烈,吞吐量下降
无锁无等待区别?
进度:无锁至少有一个线程成功,其他可能重试
无等待所有线程必成功,无重试
实现:无锁依赖cas等原子操作
无等待exchange等原子操作
spsc,mpsc,spmc,mpmc?
s指single,p指produce,m指multie前缀,多数,c指comsumer。四个分别指
单生产者单消费者
多生产者单消费者
单生产者多消费者
多生产者多消费者四种情况
Volatile,内存屏障,原子操作间的关系?
volatile:防止编译器优化,确保内存可见性
内存屏障 std::atomic_thread_fence
防止指令重排,保证内存可见性
无锁队列的代码实现
这里我们实现一个ring_buffer的spsc无锁队列实现,ringbuffer指的是这个缓存空间是环状的,但是我们都知道内存空间里是不存在环状的内存的,所以我们设计两个指针,write和read,等指到数组最后时再指回来实现环状。首先我们要知道做一个队列肯定是一个模板类,因为不确定队列中元素的类型。
我们先讨论私有变量
私有变量
private:alignas(64) std::atomic<std::size_t> read_;alignas(64) std::atomic<std::size_t> write_;alignas(64) std::aligned_storage_t<sizeof(T),alignof(T)> buffer_[Capacity];
这里三个变量没有什么讨教,但是声明很有讨教,先看
alignas
alignas
是 C++11 引入的关键字,用于指定变量、类成员或类型的对齐要求。对齐(alignment)指数据在内存中的起始地址需满足特定倍数条件,通常用于优化硬件访问效率(如 SIMD 指令)或满足特定硬件约束(如 DMA 传输)。
这里alignas(64)指的就是这三个变量的其实地址位置必须是64的倍数,那么就有朋友要问了,为什么是64呢,请看下图
一个chcheline的长度是64字节,而读取数据的单位也就是者一个chcheline,如果说两个原子变量在同一个chcheline的话,其中一个更新会连带另一个进行更新,这样会造成程序运行减慢,分开内存对齐更符合高性能的设计目标。
aligned_storage_t
aligned_storage_t
是 C++ 标准库中提供的一个模板类,用于分配具有特定对齐要求的未初始化存储空间。它通常用于实现自定义容器、类型擦除或需要手动管理内存的场景。该模板类定义在 <type_traits>
头文件中。
用人话来说就是初始一个没有类型,没有对齐的存储空间,文中是创建了这样的一个数组空间,方便初始化。
构造和析构函数
template<typename T,std::size_t Capacity>
class RainBuffer {
public:static_assert(Capacity && !(Capacity & (Capacity - 1)),"Capacity 必须是2的n次方");RainBuffer() : read_(0),write_(0){}~RainBuffer() {std::size_t r = read_.load(std::memory_order_relaxed);std::size_t w = write_.load(std::memory_order_relaxed);while(r != w) {reinterpret_cast<T *>(&buffer_[r])->~T();r = (r + 1) & (Capacity - 1);}}
static_assert
静态断言(Static Assertion)是一种在编译时进行检查的机制,用于验证代码中的条件是否满足。如果条件不成立,编译将失败并输出错误信息。静态断言通过编译时检查避免运行时错误,常用于模板元编程、类型检查或常量表达式的验证。
这里就是前置条件Capacity && !(Capacity & (Capacity - 1)不符合是输出后面的内容来保证Capacity是2的倍数,同时这里用了位运算
位运算
位运算就是直接对整数的二进制位进行操作的运算,效率非常高,是计算机底层的基本操作。所以这里将取余改为与运算
运算符 | 名称 | 作用 |
& | 与 | 都是1才是1 |
` | 或 | 有1就是1 |
^ | 异或 | 相同为0不同为1 |
~ | 取反 | 把 0 变 1,1 变 0 |
<< | 左移 | 向左移动 n 位,相当于乘以 2ⁿ |
>> | 右移 | 向右移动 n 位,相当于除以 2ⁿ(有符号/无符号略有区别) |
2的倍数只有首位为1,减1后后位全为1,所以只要不是到了Capacity,与的返回值都是原本的值,到了Capacity返回值为0.
Push
template<typename U>bool Push(U && value) {//万能引用const std::size_t w = write_.load(std::memory_order_relaxed);std::size_t w_next = (w + 1) & (Capacity - 1);if(w_next == read_.load(std::memory_order_acquire)) {return false;}new (&buffer_[w]) T(std::forward<U>(value));write_.store(w_next,std::memory_order_release);return true;}
这里我们留一个位置不放数据作为标兵位,检验是否有位置可以push
这里用模板函数因为不知道传进来的是什么类型,同时用万能引用可以接受左值和右值。
new (&buffer_[w]) T(std::forward<U>(value));
这里使用了placement new语法
new (内存地址) 类型(构造参数...);
这里直接在buffer里进行构造,同时用完美转发保证原来值的身份
完美转发就是需要把参数原封不动地传给另一个函数或构造函数,并希望保持原参数的类型(左值 or 右值)。
这里的内存序也值得讨论下面两个load和store不需解释,最上面的load为什么用relaxed呢,因为咱们是spsc,只有一个生产者线程,所以不需要考虑别的线程也在push,使用relaxed即可
Pop
bool Pop(T& value) {std::size_t r = read_.load(std::memory_order_relaxed);if(r == write_.load(std::memory_order_acquire)) {return false;}value = std::move(*reinterpret_cast<T *>(&buffer_[r]));reinterpret_cast<T *>(&buffer_[r])->~T();read_.store((r + 1) & (Capacity - 1),std::memory_order_release);return true;}
检验是否有数据可以pop后
value = std::move(*reinterpret_cast<T *>(&buffer_[r]));
这里完成的是从队列中“取出”元素并转移所有权到外部变量 value
上,reinterpret则是将buffer里的数据转为T*类型,再用move语义吧数据转成右值给到value,避免拷贝。
size
std::size_t Size() const{const std::size_t r = read_.load(std::memory_order_acquire);const std::size_t w = write_.load(std::memory_order_acquire);return (w >= r) ? (w - r) : (Capacity - r + w);}
size函数获取空闲块数量
更多资料在:https://github.com/0voice查询