当前位置: 首页 > news >正文

FastDDS (SharedMemory)


=========================================================================================
=============================== SharedMemSegment  Start ==========================
// Fast-DDS/src/cpp/utils/shared_memory/SharedMemSegment.hpp

class SharedSegmentBase
{
=============================== 内部类  start ==========================
class Id
{
public:
typedef UUID<8> type;
Id();   // 返回共享内存变量的ID
Id(const Id& other);  // 设置共享内存变量的ID
void generate();  // 给当前Id随机生成一个UUID
private:
type uuid_;
}
=============================== 内部类  stop ==========================
public:
using sharable_lock = boost::interprocess::sharable_lock<M>;
using sharable_mutex = boost::interprocess::interprocess_sharable_mutex;

  using condition_variable = RobustInterprocessCondition;
using mutex = boost::interprocess::interprocess_mutex;
using named_mutex = boost::interprocess::named_mutex;
using spin_wait = boost::interprocess::spin_wait;
static constexpr uint32_t EXTRA_SEGMENT_SIZE = 512;  // 每个共享内存变量需要额外的内存来维护其信息
public:
static deleted_unique_ptr<SharedSegmentBase::named_mutex> open_or_create_and_lock_named_mutex(const std::string& mutex_name); // 创建带名称的进程间互斥变量
void* get_address_from_offset(SharedSegment::Offset offset);  // 返回当前内存块上创建的对象的句柄对应的进程内地址
SharedSegment::Offset get_offset_from_address(void* address);  // 返回当前内存块上创建的对象在进程内地址所对应的对象句柄
private:
std::string name_;  // 共享内存名称    
}

template<typename T, typename U>
class SharedSegment : public SharedSegmentBase
{
public:
typedef T managed_shared_memory_type;  // boost::interprocess::basic_managed_shared_memory<...>或者boost::interprocess::basic_managed_mapped_file<...>
typedef U managed_shared_object_type;     // boost::interprocess::shared_memory_object或者boost::interprocess::file_mapping

    SharedSegment(boost::interprocess::create_only_t, const std::string& name, size_t size);  // 构造函数,创建共享内存块
static void remove(const std::string& name);  // 移除共享内存块(感觉就是unmap操作)
void remove();  // 移除当前SharedSegment的共享内存块
Offset mem_size() const;  // 以字节为单位返回SharedSegment的大小

private:
std::unique_ptr<managed_shared_memory_type> segment_;  // boost::interprocess::basic_managed_shared_memory对象
}

using SharedMemSegment = SharedSegment<boost::interprocess::basic_managed_shared_memory<...>, boost::interprocess::shared_memory_object>
=============================== SharedMemSegment  Stop ==========================

// Fast-DDS/src/cpp/utils/shared_memory/RobustInterprocessCondition.hpp

// 双向列表的节点
struct SemaphoreNode
{
bi::interprocess_semaphore sem {0};   // 跨进程信号量
uint32_t next;                                               // 指向上一个节点的索引
uint32_t prev;                                                 // 指向下一个节点的索引
};

// 双向列表
class SemaphoreList
{
private:
uint32_t head_;    // 指向列表开头SemaphoreNode元素的索引
uint32_t tail_;         // 指向列表结尾SemaphoreNode元素的索引

public:
static constexpr uint32_t LIST_NULL = static_cast<uint32_t>(-1);

void push(uint32_t sem_index, SemaphoreNode* sem_pool);  // 添加元素(的索引)到列表中
uint32_t pop(SemaphoreNode* sem_pool);    // 从列表中移除并且返回尾部的元素的索引
uint32_t tail();        // 返回列表尾部的元素的索引
uint32_t head();      // 返回列表头部的元素的索引
void remove(uint32_t sem_index, SemaphoreNode* sem_pool);  // 将sem_index索引指定的元素从列表中移除
}

// 进程间共享锁
// 原理,在当前主机上创建文件,并且通过flock函数来执行锁定和解锁等操作, 从而完成进程间的锁
// 对于共享锁绑定的文件来说,使用前首先尝试flock(fd, LOCK_EX | LOCK_NB)看下文件是否已经被别的进程上了互斥锁
// 如果当前进程可以上LOCK_EX锁,说明该文件没有被别的进程上互斥锁
// 这样的情况下再通过flock(fd, LOCK_UN | LOCK_NB)上共享锁
class RobustSharedLock
{
public:
RobustSharedLock(std::string& name,...); // 构造函数,其中会直接尝试对文件上锁
~RobustSharedLock();    // 析构函数,其中会对文件的
static bool is_locked(const std::string& name);  // 查询name对应的文件是否被别的进程上了互斥锁
static bool remove(const std::string& name);  // 删除name对应的文件
private:
std::string name_;    // 文件名称
int fd_;                        // 文件描述符

}

// 进程间排他锁(读写锁)
class RobustExclusiveLock
{
// 结构基本等同于 RobustSharedLock 共享锁,只是上锁的flock函数中用的是LOCK_EX,而不是LOCK_SH。
}

// 进程间条件变量
class RobustInterprocessCondition
{
public:
void notify_one();    // 唤醒list_listening_中尾部的listener
void notify_all();  // 唤醒list_listening_中所有的listener

private:
void init_sem_list();  // 初始化    semaphores_pool_,将semaphores_pool_数组初始化为一个顺序的双向列表,每个Node的prev指向上一个Node, next指向下一个Node
uint32_t enqueue_listener();  // 从list_free_中pop一个Node加入到list_listening_中
void dequeue_listener(uint32_t sem_index);    // 从list_listening_中移除下标索引为sem_index的元素,并且将该元素重新push到list_free_的尾部
void do_wait(bi::interprocess_mutex& mut);    // 加入一个listener(SemaphoreNode)到list_listening_,并且在该listener上执行wait(infinite)操作
bool do_timed_wait(const boost::posix_time::ptime& abs_time, bi::interprocess_mutex& mut);  // 加入一个listener(SemaphoreNode)到list_listening_,并且在该listener上执行timed_wait(abs_time)操作

private:
static constexpr uint32_t MAX_LISTENERS = 512;
SemaphoreNode semaphores_pool_[MAX_LISTENERS];   // 一个condition中有512个Listener,每个listener是一个SemaphoreNode
SemaphoreList list_listening_;   // 初始化时list_listening_列表中没有元素
SemaphoreList list_free_;               // 初始化时list_free_拥有semaphores_pool_中所有元素的索引
boost::interprocess::interprocess_mutex semaphore_lists_mutex_;  // 进程间互斥锁,保护list_listening_和list_free_的操作
}

=============================== MultiProducerConsumerRingBuffer  start ==========================
// 环形队列(支持多生产者多消费者),其读写都是无锁的
template <class T>
class MultiProducerConsumerRingBuffer
{
=============================== 内部类  start ==========================
// 环形队列中的数据单元
class Cell
{
public:
void data(const T& data);    // 填充数据
const T& data();    // 返回数据
uint32_t ref_counter() const;  // 返回该单元的引用计数

        std::atomic<uint32_t> ref_counter_;
T data_;   // 单元中保存的数据,一般是BufferDescriptor
}

    // 保存了所属环形队列的写入指针和空余Cell数量
union PtrType
{
struct Pointer
{
uint32_t write_p;    // write_pointer 对环形队列的写入指针
uint32_t free_cells;   // 环形队列中还未使用的Cell
}
ptr;
uint64_t u;
};

    // 环形队列的Listener
class Listener
{
public:
Listener(MultiProducerConsumerRingBuffer<T>& buffer, uint32_t write_p);  // 构造函数
~Listener();      // 析构函数,从RingBuffer中取消当前listener的注册
Cell* head();      // 返回该Listener从RingBuffer中可以读取的第一个Cell单元,同时增加该Cell单元的引用计数
bool pop();          // 标识listener已经读取完了read_p指向的Cell,并且将该Cell的引用计数-1,
// 如果发现该Cell没有任何Listener需要访问了,则将环形队列的free_cells+1,并且将该listener的read_p_向后+1
private:
MultiProducerConsumerRingBuffer<T>& buffer_;    // 对环形队列的引用
uint32_t read_p_;  // read_pointer 该对环形队列的读取指针,如果该listener的read_p和环形队列的write_p相等,说明对于该listener来说,目前没有可以读取的Cell单元
}

    // 保存了所属环形队列的所有属性,包括写入指针,空余Cell数量,Cell总数量以及listener数量
struct Node
{
alignas(8) std::atomic<PtrType> pointer_;
uint32_t total_cells_;  // 标识Node所属环形队列有多少Cell单元

uint32_t registered_listeners_;  // 标识Node所属环形队列有多少Listener
};
=============================== 内部类  stop ==========================
public:
MultiProducerConsumerRingBuffer(Cell* cells_base, uint32_t total_cells);    // RingBuffer构造函数,可以看到所有的Cell单元都是外部传入的,一般这些Cell单元都是在共享内存块上分配的,第二个参数是Cell单元总数量
bool push(constT& data);    // 向RingBuffer中加入数据,如果没有listener,则添加失败,如果当前RingBuffer中free_cells_=0,则添加失败,返回buffer已满,否则,找到空余的Cell,将Data拷贝到该Cell的data_成员中
bool is_buffer_full();    // 如果free_cells_=0,则表示当前环形队列已经满了
bool is_buffer_empty();      // 如果free_cells_ = total_cells_,说明当前环形队列是空的,内部的所有Cell都没有填充数据
std::unique_ptr<Listener> register_listener();    // 向该环形队列注册listener

private:
void unregister_listener(Listener& listener);  // 从环形队列中取消特定listener的注册
static uint32_t get_pointer_value(uint32_t pointer);  // 去掉pointer中最高位的loop flag,返回pointer对应的实际索引,用于在cells_数组中定位

private:
Node* node_;   // Node对象,保存环形队列的状态 (write_p_, free_cells_, total_cells_, listeners_count_)
Cell* cells_;     // 环形队列所拥有的Cell
}
=============================== MultiProducerConsumerRingBuffer  Stop ==========================


=============================== SharedMemGlobal  Start ==========================
// Fast-DDS/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
// 该文件中全局类是SharedMemGlobal


class SharedMemGlobal
{
=============================== 内部类  start ==========================
// BufferDescriptor内部类
struct BufferDescriptor
{
SharedMemSegment::Id source_segment_id;         // BufferDescriptor所对应的共享对象所属的共享内存的ID
SharedMemSegment::Offset buffer_node_offset;        // BufferDescriptor对对应的共享对象的句柄(可以通过boost的接口转换为进程内的地址)
uint32_t validity_id;                        // 有效标志
}

typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Listener Listener;
typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell PortCell;

// PortNode内部类
// 注意,PortNode是构建在共享内存中的对象(SharedMemGlobal::init_port函数中创建,对象名为"port_node_abi5",其中5为ABI版本号
// PortNode* port_node = nullptr;
// port_node = segment->get().construct<PortNode>(("port_node_abi" + std::to_string(CURRENT_ABI_VERSION)).c_str())();
// PortNode用来保存Port的状态信息
struct PortNode   
{
SharedMemSegment::Offset buffer;      // Port内的环形队列中的Cell数组在共享内存块中的地址
SharedMemSegment::Offset buffer_node;    // Port内的环形队列中的Node对象在共享内存块中的地址
// 上面两个都是共享内存中对象的句柄,分别代表了环形队列中的Cell数组和Node节点对象,在SharedMemGlobal的init_port中分配
uint32_t port_id;                                         // 端口ID
uint32_t num_listeners;                                // 端口的监听者数量
uint32_t healthy_check_timeout_ms;      // ?
UUID<8> uuid;                                                    // 端口的UUID
SharedMemSegment::condition_variable empty_cv;  // 进程间条件变量  (类型为RobustInterprocessCondition)
SharedMemSegment::mutex empty_cv_mutex;   // 进程间互斥锁   (类型为boost::interprocess::interprocess_mutex),对Port有修改操作的时候需要使用该锁进行互斥(例如Push,Pop,create_listener等)
static constexpr size_t LISTENERS_STATUS_SIZE = 1024;   // 最大允许1024个Listener

struct ListenerStatus   // 每一个Listener的状态
{
uint8_t is_in_use               : 1;  // 标识listener目前是否活跃
uint8_t is_waiting              : 1;  // 标识listener目前是否在等待port上有新的消息
uint8_t is_processing           : 1;  // 标识listener目前是否正在处理port上的消息
BufferDescriptor descriptor;          // 标识listener目前正在处理的消息的BufferDescriptor
};
ListenerStatus listeners_status[LISTENERS_STATUS_SIZE];   // 保存该Port的所有Listener
char domain_name[MAX_DOMAIN_NAME_LENGTH + 1];  // Port所属的域名(?)
}

// Port内部类
class Port
{
private:
std::shared_ptr<SharedMemSegment> port_segment_;  // Port使用到的共享内存
PortNode* node_;   // PortNode成员,保存该Port的listeners和状态
std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor>> buffer_;  // 保存Port上传输数据的环形队列
std::unique_ptr<RobustExclusiveLock> read_exclusive_lock_;   // 跨进程读写锁
std::unique_ptr<RobustSharedLock> read_shared_lock_;                 // 跨进程排他锁
public:
enum class OpenMode   // Port的打开模式
{
ReadShared, (port上可以有多个listener和多个writer,ReadShared和ReadExclusive两种模式是互斥的)
ReadExclusive,  (port上可以有多个Writer,但是只能有一个listener)
Write  (标识该port随时可以进行Write动作)
};
Port(SharedMemSegment* port_segment, PortNode* node, ...);   // 构造函数,保存该Port的PortNode和RingBuffer的Node以及Cell(这些都是在共享内存中创建的对象)
// 增加该Port的引用计数(保存在PortNode中)
~Port();    // 析构函数,将Port的引用计数-1, 
bool try_push(const BufferDescriptor& buffer_descriptor, bool* listeners_active);  // 向Port的环形队列中增加BufferDescriptor,并且返回当前环形队列是否有listener注册了
void wait_pop(Listener& listener, const std::atomic<bool>& is_listener_closed, uint32_t listener_index);  // 让某个该Port上的listener等待直到Port的环形队列中有数据,这个会修改该listener的ListenerStatus的状态
// 并且在PortNode的empty_cv_mutex上wait知道Port上有push后唤醒
bool is_port_ok();    // 标识该Port是否正常工作
uint32_t port_id();  // 返回Port ID
OpenMode open_mode();  // 返回Port的打开模式(读互斥/读共享/任意写)
void close_listener(std::atomic<bool>* is_listener_closed);   // 强制关闭当前Port上的所有listener
void pop(Listener& listener, bool& was_cell_freed);   // 让某个Port的listener弹出可以读取的cell(如果该cell没有listener需要读取,则增加free_cells_),并且将Listener的读取指针往下加一
std::unique_ptr<Listener> create_listener(uint32_t* listener_index);   // 创建该Port的Listener,并且返回该Listener的index
void unregister_listener(std::unique_ptr<Listener>* listener, uint32_t listener_index);  // 取消该Port的listener
bool get_and_remove_blocked_processing(BufferDescriptor& buffer_descriptor);    // 将第一个还在处理bufferdescriptor的listener停止,并且返回其还在处理的BufferDescriptor
void listener_processing_start(uint32_t listener_index, const BufferDescriptor& buffer_descriptor);  // 让指定的listener开始处理某个BufferDescriptor
void listener_processing_stop(uint32_t listener_index);  // 标识某个listener已经完成对BufferDescriptor的处理
void lock_read_exclusive();  // 对Port施加互斥锁(只能有一个listener), RobustExclusiveLock
void lock_read_shared();   // 对Port施加共享锁(可以有多个listener,多个Writer),RobustSharedLock
void unlock_read_locks();   // 释放读锁
}                                                

=============================== 内部类  stop ==========================

static bool is_zombie(uint32_t port_id, const std::string& domain_name);  // 判断某个port_id所属的进程是否已经是僵尸进程了,判断方式:
// 1. 如果可以获取该port_id的互斥锁,并且该互斥锁已经存在
// 2. 如果可以获取该Port_id的共享锁,并且该共享锁已经存在
std::shared_ptr<Port> init_port(uint32_t port_id, std::unique_ptr<SharedMemSegment>& segment, uint32_t max_buffer_descriptors, Port::OpenMode open_mode, uint32_t healthy_check_timeout_ms);
// 1. 在共享内存中创建PortNode的共享对象并且初始化PortNode对象的状态(port_node = segment->get().construct<PortNode>...)
// 2. 在共享内存中创建用于存放BufferDescriptor的RingBuffer的Node和Cell(segment->get().construct<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell>,
segment->get().construct<MultiProducerConsumerRingBuffer<BufferDescriptor>::Node>)
// 3. 初始化RingBuffer的Node的状态 (MultiProducerConsumerRingBuffer<BufferDescriptor>::init_node(buffer_node, max_buffer_descriptors);)
// 注意,这个open_port主要用来打开其他进程创建的Port
std::shared_ptr<Port> open_port(uint32_t port_id, uint32_t max_buffer_descriptors, uint32_t healthy_check_timeout_ms, Port::OpenMode open_mode = Port::OpenMode::ReadShared);  // 打开端口, open_port_internal
// 1. 如果该端口之前未被正确关闭,则需要关闭(SharedMemSegment::remove(port_segment_name.c_str());)
// 2. 重新映射该端口的共享内存(new SharedMemSegment(boost::interprocess::open_only, port_segment_name.c_str()));)
// 3. 找到该端口的PortNode(port_node = port_segment->get().find<PortNode>)
// 4. 创建该端口的Port对象(port = std::make_shared<Port>(std::move(port_segment), port_node);)
// 5. 如果打开模式中配置了互斥锁,则对端口加上互斥锁

std::string domain_name_;   // Port的domain名称(FastRtps),会影响Port的共享内存变量的名称(Fastrtps_port27481)
// auto port_segment_name = domain_name_ + "_port" + std::to_string(port_id);

}


=============================== SharedMemManager  Start ==========================
// Fast-DDS/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp

class SharedMemManager
{
=============================== 内部类  start ==========================
// BufferNode 内部类
// BufferNode对象的构建是在共享内存块上创建的,例如: 
// Alloc the buffer nodes
auto buffers_nodes = segment_->get().construct<BufferNode>(boost::interprocess::anonymous_instance)[max_allocations]();

struct BufferNode  // 用于表示一个Buffer的状态
{
struct Status   // 总共占用8字节
{
uint64_t validity_id : 24;            // 
uint64_t enqueued_count : 20;        // Push到Port时增加1,从Port中Pop时递减1
uint64_t processing_count : 20; // 当listener处理该Buffer时对该成员加1,处理完成后-1
};
Status status;        // Buffer状态
uint32_t data_size;        // 数据大小
SharedMemSegment::Offset data_offset;        // 共享业务数据的句柄(通过句柄可以获取该业务数据所占用共享内存在当前进程中的地址)
bool invalidate_buffer();                                // 将该Buffer的状态改为Invalidate    
bool invalidate_if_not_processing();        // 当该Buffer没有被listener处理时,修改器状态为Invalidate
bool dec_enqueued_inc_processing_counts(uint32_t listener_validity_id);        // enqueued_count-1, processing_count+1 ,在SharedMemManager::Listener对该Buffer做pop处理时会调用该函数
bool inc_processing_count(uint32_t listener_validity_id);        // 增加buffer的processing_count
bool inc_enqueued_count(uint32_t listener_validity_id);
bool dec_enqueued_count(uint32_t listener_validity_id);
bool is_not_referenced();
bool dec_processing_count(uint32_t listener_validity_id);
}

// Buffer 内部类
class Buffer
{
public:
virtual void* data() = 0;
virtual uint32_t size() = 0;
}

// SharedMemBuffer 内部类
class SharedMemBuffer : public Buffer
{
public:
SharedMemBuffer(std::shared_ptr<SharedMemSegment>& segment, SharedMemSegment::Id& segment_id, BufferNode* buffer_node, uint32_t original_validity_id);  // 创建SharedMemBuffer
~SharedMemBuffer();  // 析构函数
void* data();        // 返回所包装的bufferNode对应的共享内存中的业务数据在当前进程中的地址
uint32_t size();    // 返回包装的bufferNode对应的共享内存中的业务数据的大小
SharedMemSegment::Offset node_offset();        // 返回所包装的bufferNode对应的数据的句柄(可以跨进程访问,类似于内核句柄)
SharedMemSegment::Id segment_id();        // 返回所依赖的共享内存区间SharedMemSegment的ID
uint32_t validity_id();        // 返回包装的bufferNode的validity_id
void inc_enqueued_count(uint32_t validity_id);  // 递增所包装的bufferNode的enqueued_count
void dec_enqueued_count(uint32_t validity_id);  // 递减所包装的bufferNode的enqueued_count

private:
std::shared_ptr<SharedMemSegment> segment_;        // 用于给业务数据分配共享内存的SharedMemSegment对象指针
void* data_;    // 所包装的bufferNode对应的共享内存中的业务数据在当前进程中的地址
BufferNode* buffer_node_;        // 保存Buffer状态的BufferNode
uint32_t original_validity_id_;   // 构建该Buffer时BufferNode中validity_id的值
}

// Segment 内部类,内部包含BufferNode的集合,代表一段共享内存区间,提供共享内存块的分配和回收,下面对接SharedMemSegment
class Segment
{
public:
Segment(uint32_t size, uint32_t payload_size, uint32_t max_allocations, const std::string& domain_name);    // 构造函数,内部创建SharedMemSegment对象
~Segment();    // 析构函数
SharedMemSegment::Id id();    // 返回依赖的SharedMemSegment对象的Id
std::shared_ptr<Buffer> alloc_buffer(uint32_t size, steady_clock::time_point& max_blocking_time_point);        // 从SharedMemSegment对象中分配共享内存SharedMemBuffer
uint64_t mem_size();        // 返回SharedMemSegment对象映射的共享内存的总大小

private:
std::string segment_name_;  // 所依赖的SharedMemSegment对象的名称
std::list<BufferNode*> free_buffers_;  // 空闲的BufferNode列表
std::list<BufferNode*> allocated_buffers_;  // 已被分配的BufferNode列表
std::mutex alloc_mutex_;        // 分配共享内存时使用的锁
std::shared_ptr<SharedMemSegment> segment_;     // 依赖的SharedMemSegment对象
SharedMemSegment::Id segment_id_;     // 依赖的SharedMemSegment对象的ID
uint64_t overflows_count_;  // 分配共享内存失败的次数
uint32_t free_bytes_;  // 可分配的剩余共享内存大小(单位byte)

private:
void generate_segment_id_and_name(std::string& domain_name);  // 生成SharedMemSegment对象的Name和ID
BufferNode* pop_free_node();    // 从free_buffers_中弹出最后一个可用的BufferNode并且返回
void release_buffer(BufferNode* buffer_node);        // 从SharedMemSegment对象中释放参数中BufferNode申请的共享内存
bool recover_buffers(uint32_t required_data_size);    // 从allocated_buffers_中释放没有被listener使用的BufferNode直到free_bytes_大于required_data_size

}

// Listener 内部类,内部创建SharedMemGlobal::Listener对象用于监听SharedMemGlobal::Port,读取其他进程Port上Push的BufferDescriptor,进而获取其他进程发出的共享数据
class Listener
{
public:
Listener(SharedMemManager* shared_mem_manager, std::shared_ptr<SharedMemGlobal::Port> port);    // 构造函数,创建SharedMemGlobal::Listener对象
~Listener();        // 析构函数,取消创建SharedMemGlobal::Listener对象对Port的注册
std::shared_ptr<Buffer> pop();    // 从Port中返回可以读取的第一个Buffer
void stop_processing_buffer();  // 通知Port当前Listener的状态改为停止状态
void regenerate_port();        // 当Port状态不OK的时候,重新创建该PortID的Port
void close();        // 关闭当前Listener,这个会接触正在运行的pop调用
private:
std::shared_ptr<SharedMemGlobal::Port> global_port_;     // 要监听的SharedMemGlobal::Port对象
std::unique_ptr<SharedMemGlobal::Listener> global_listener_;   // 内部创建用于监听Port的SharedMemGlobal::Listener对象
uint32_t listener_index_;        // 当前global_listener_在SharedMemGlobal::Port对象的PortNode的Listener容器中的索引
SharedMemManager* shared_mem_manager_;        // 外部传入的SharedMemManager对象,用于根据BufferDescriptor中的segment_id定位到其他进程创建的SharedMemSegment对象,从而访问到对方进程创建的共享Buffer的BufferNode
std::atomic<bool> is_closed_;        // 标识Listener的close状态,这个标志会影响Listener的pop函数,导致其退出等待
}

// Port 内部类,内部包含了SharedMemGlobal::Port对象
// 通过Port类,可以向SharedMemGlobal::Port对象中Push buffer,也可以创建Listener监听该Port
class Port
{
public:
Port(SharedMemManager* shared_mem_manager, std::shared_ptr<SharedMemGlobal::Port> port, SharedMemGlobal::Port::OpenMode open_mode);        // 构造函数
bool try_push(const std::shared_ptr<Buffer>& buffer);        // 将Buffer送入Port(调用SharedMemGlobal::Port的try_push接口,并且增加BufferNode的enqueue_count)
void recover_blocked_processing();        // 当内部SharedMemGlobal::Port对象变为僵尸Port时,将该Port中所有正在被listener处理的Buffer的processing_count做递减处理(配合regenerate_port一起使用)
// 因为BufferNode是跨进程共享的,如果不讲processing_count,会造成该Buffer始终无法被回收
std::shared_ptr<Listener> create_listener(); // 在内部SharedMemGlobal::Port对象上创建并且返回一个Listener

private:
void regenerate_port();  // 首先释放当前端口中还在读取的Buffer(BufferNode中的process_count -1,一遍让push的一方可以回收buffer),然后重新创建SharedMemGlobal::Port

private:
SharedMemManager* shared_mem_manager_;        // 共享内存对象
std::shared_ptr<SharedMemGlobal::Port> global_port_;
SharedMemGlobal::Port::OpenMode open_mode_;      // 端口打开模式
}

// Port 内部类
class SegmentWrapper
{
// SegmentWrapper的内部类 WatchTask,这是个单例类,用于监控进程中所有的SegmentWrapper
class WatchTask : public SharedMemWatchdog::Task
{
public:
static std::shared_ptr<WatchTask>& get();        // 返回 WatchTask 单例对象
void add_segment(std::shared_ptr<SegmentWrapper> segment);    // 添加要监控的SegmentWrapper
void remove_segment(std::shared_ptr<SegmentWrapper> segment);        // 移除已监控的SegmentWrapper
~WatchTask();            // 析构函数

private:
WatchTask() : watched_it_(watched_segments_.end()), shared_mem_watchdog_(SharedMemWatchdog::get());         // 构造函数
void update_watched_segments();        // 使用to_add_和to_remove_来更新watched_segments_容器中的SegmentWrapper
void run();        // 定时运行的检测函数,调用watched_segments_中每个SegmentWrapper的check_alive进行Alive检测,并且移除非Alive的Segment
private:
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;                // 保存已监控的SegmentWrapper
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_;        // watched_segments_容器的迭代器

std::vector<std::shared_ptr<SegmentWrapper>> to_add_;        // add_segment调用会现将要添加的SegmentWrapper放入这个to_add_容器,后面才会进入watched_segments_容器
std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;        // remove_segment调用会现将要移除的SegmentWrapper放入这个to_remove_容器,后面从watched_segments_容器中移除
std::mutex to_add_remove_mutex_;        
}

private:
std::weak_ptr<SharedMemManager> shared_mem_manager_;        // 指向外部类SharedMemManager对象的弱指针
std::shared_ptr<SharedMemSegment> segment_;        // 需要监控的SharedMemSegment指针
SharedMemSegment::Id segment_id_;        // 需要监控的SharedMemSegment的Id
std::string segment_name_;        // 需要监控的SharedMemSegment的名称
std::string lock_file_name_;        // 文件名称为segment名称+"_el"
std::atomic<std::chrono::steady_clock::time_point::rep> last_alive_check_time_;        // 最后一次检查该Wrapper的时间
static constexpr uint32_t ALIVE_CHECK_TIMEOUT_SECS {5};    // 要求是5秒检测一次

private:
bool check_alive();    // 该函数会被WatchTask调用,检查所包含的SharedMemSegment是否已经失效了,如果失效了则让SharedMemManager移除对该SharedMemSegment的映射
bool alive_check_timeout(const std::chrono::steady_clock::time_point& now) const;        // 用于告知WatchTask该Wrapper是否挺长时间没检测,需要检测了,返回true则说明需要检测
void close_and_remove();        // 关闭该SegmentWrapper,并且让SharedMemManager移除对该SharedMemSegment的映射

public:
SegmentWrapper(std::weak_ptr<SharedMemManager> shared_mem_manager, std::shared_ptr<SharedMemSegment> segment_, SharedMemSegment::Id segment_id, const std::string& segment_name); // 构造函数
std::shared_ptr<SharedMemSegment> segment();        // 返回所监控的SharedMemSegment对象的指针
void update_alive_time(const std::chrono::steady_clock::time_point& time);        // 更新该Wrapper最后一次被检测的时间点
}
=============================== 内部类  stop ==========================

public:
std::shared_ptr<Segment> create_segment(uint32_t size, uint32_t max_allocations);    // 创建Segment,内部创建SharedMemSegment,并且包含空闲的BufferNode列表和已分配的BufferNode列表
uint32_t segment_allocation_extra_size(uint32_t max_allocations);    // 计算Segment的额外空间 (需要把BufferNode的占用也算进去)
std::shared_ptr<Port> open_port(uint32_t port_id, uint32_t max_descriptors, uint32_t healthy_check_timeout_ms, SharedMemGlobal::Port::OpenMode open_mode);  // 打开Port(不存在的情况下会创建)
void remove_port(uint32_t port_id);    // 移除Port共享内存在当前进程的映射(SharedMemSegment::remove)
SharedMemGlobal* global_segment();    // 返回SharemMemManager内部的SharedMemGlobal对象指针
uint64_t segments_mem();        // 返回内部Segment所包装的SharedMemSegment映射的共享内存区域的大小

private:
std::shared_ptr<Port> regenerate_port(std::shared_ptr<SharedMemGlobal::Port> port,SharedMemGlobal::Port::OpenMode open_mode);  // 根据参数port的port_id重新创建Port
std::shared_ptr<SharedMemSegment> find_segment(SharedMemSegment::Id id);        // 通过Id映射SharedMemSegment到当前进程并且使用SegmentWrapper包装后进行管理(生命周期)
void release_segment(SharedMemSegment::Id id);        // 取消ids_segments_中已经映射的SharedMemSegment
void remove_segments_from_watch();        // 移除所有的SegmentWrapper的Watch
private:
std::mutex ids_segments_mutex_;        // 操作ids_segments_时需要获取的锁
uint64_t segments_mem_;        // 内部Segment所包装的SharedMemSegment映射的共享内存区域的大小
SharedMemGlobal global_segment_;        // SharedMemGlobal对象
std::unordered_map<SharedMemSegment::Id::type, std::shared_ptr<SegmentWrapper>, std::hash<SharedMemSegment::Id::type>> ids_segments_;  // 管理所有打开并且映射到当前进程的SharedMemSegment
}
=============================== SharedMemManager  Stop ==========================


http://www.lryc.cn/news/607366.html

相关文章:

  • webpack面试题及详细答案80题(41-60)
  • C++ 前缀和、双指针
  • Node.js中Buffer的用法
  • 嵌入式第十七课!!!!位运算!!!
  • 考取锅炉司炉工证需要学习哪些专业知识?
  • Linux 用户与组管理:从配置文件到实操命令全解析
  • golang的函数
  • YOLO V11 + BotSort行人追踪定位项目
  • 风光储并离网切换仿真模型(下垂控制一次调频)
  • 详解K8s集群搭建:从环境准备到成功运行
  • 【问题思考总结】CART树如何剪枝?从CART树的生成到剪枝以及为什么CTt一定小于Ct?【图文】
  • 在多租户或多服务共享 Redis 时,如何做逻辑隔离或权限控制?
  • 【数据结构】-----排序的艺术画卷
  • ESD监控系统确保工厂生产设备的静电安全
  • 浏览器【详解】内置Observer(共五种,用于前端监控、图片懒加载、无限滚动、响应式布局、生成安全报告等)
  • cesium FBO(四)自定义相机渲染到Canvas(离屏渲染)
  • 开源工具FossFLOW,绘制技术图表
  • 嵌入式GPU图像渲染工具全景实用指南(i.MX8MP平台)
  • Python深度解析与爬虫进阶:从理论到企业级实践
  • ubuntu 镜像克隆
  • Git 实现原理剖析
  • 【编号394】阿姆河流域土地利用分布数据(1990-2015)
  • 智能问数系统的调研
  • 【工具分享】模拟接口请求响应的Chrome插件ModResponse
  • 什么是doris
  • 第七章 愿景12 小萍分享《人性的弱点》
  • 软件性能优化:善用80-20法则,精准突破瓶颈
  • grafana/lock-stack 日志 Pipeline 配置
  • 前端渲染三国杀:SSR、SPA、SSG
  • npm报错:npm install 出现“npm WARN old lockfile”