linux网络编程之单reactor模型(二)
单 Reactor 单线程模型的主要缺点,简洁列出:
事件处理易阻塞:所有事件处理(包括业务逻辑)都在一个线程中,任一操作阻塞会拖慢整个事件循环。
无法利用多核 CPU:只有一个线程工作,不能发挥多核并行处理能力。
写操作可能阻塞:对端缓冲区满时写操作受限,若处理不当会阻塞主线程。
业务逻辑与 I/O 耦合:缺乏异步处理机制,复杂逻辑会拖慢事件分发。
抗并发能力差:连接数一多或负载升高,性能迅速下降,无法支撑高并发场景。
七、单reactor多线程模型
1、核心思想
为了解决单 Reactor 单线程在高并发场景下的性能瓶颈和阻塞风险,我们需要引入多线程机制,将 I/O 事件的检测与业务处理解耦。通过将耗时任务分发到后台线程池,可以显著提升系统吞吐量和响应能力,充分利用多核 CPU,实现高性能、可扩展的网络服务架构。
单 Reactor 多线程模型的优势:
主线程运行 Reactor(事件分发器):负责监听所有客户端连接的读写/新连接等 I/O 事件。
工作线程池处理任务:当 Reactor 检测到某连接可读后,将任务(如业务逻辑处理、数据解析等)分发给后台线程处理,从而避免主线程阻塞。
高性能 + 高并发:Reactor 线程不做阻塞操作,确保事件响应低延迟;业务逻辑处理并行,提高系统吞吐能力。
2、核心组件
1)Channel:事件通道
Channel
是事件驱动框架中的桥梁组件,连接了底层 epoll
(或 poll/select
)与上层的回调逻辑。它不拥有 fd
,但负责:
保存关注的事件(如读、写)
设置事件的处理回调(如
read_cb_
,write_cb_
)接收来自
epoll_wait
的触发事件revents_
调用对应的回调处理函数
常用函数说明:
EnableReading()
,EnableWriting()
:控制监听哪些事件;
HandleEvent()
:事件触发时由EventLoop
调用,执行回调;
Update()
:将事件变化通知所属EventLoop
,更新 epoll。
.h
class EventLoop;/* 负责在事件分发系统中起到“事件通道”的作用,连接底层的I/O多路复用机制(如epoll、select、poll)和具体的事件处理逻辑 */class Channel {
public:using Callback = std::function<void()>;Channel(EventLoop& loop, int fd);~Channel();void SetReadCallback(Callback cb);void SetWriteCallback(Callback cb);void EnableReading();void EnableWriting();void DisableWriting();void DisableAll();void HandleEvent();int Fd() const;uint32_t Events() const;void SetRevents(uint32_t revents);bool IsRegistered() const;void SetRegistered(bool reg);private:void Update();int fd_;uint32_t events_; // 关注的事件uint32_t revents_; // 实际触发的事件bool registered_;EventLoop& loop_;Callback read_cb_;Callback write_cb_;
};
.cpp
Channel::Channel(EventLoop& loop, int fd): fd_(fd), events_(0), revents_(0), registered_(false), loop_(loop) {}Channel::~Channel() {if (registered_) {loop_.RemoveChannel(this);}
}void Channel::SetReadCallback(Callback cb) {read_cb_ = std::move(cb);
}void Channel::SetWriteCallback(Callback cb) {write_cb_ = std::move(cb);
}void Channel::EnableReading() {events_ |= EPOLLIN;Update();
}void Channel::EnableWriting() {events_ |= EPOLLOUT;Update();
}void Channel::DisableWriting() {events_ &= ~EPOLLOUT;Update();
}void Channel::DisableAll() {events_ = 0;Update();
}void Channel::HandleEvent() {std::cout << "[Channel] fd=" << fd_ << " HandleEvent(), revents=" << revents_ << "\n";if ((revents_ & EPOLLIN) && read_cb_) {std::cout << "[Channel] fd=" << fd_ << " calling read_cb_\n";read_cb_();}if ((revents_ & EPOLLOUT) && write_cb_) {std::cout << "[Channel] fd=" << fd_ << " calling write_cb_\n";write_cb_();}// if ((revents_ & EPOLLIN) && read_cb_) read_cb_();// if ((revents_ & EPOLLOUT) && write_cb_) write_cb_();
}int Channel::Fd() const {return fd_;
}uint32_t Channel::Events() const {return events_;
}void Channel::SetRevents(uint32_t revents) {revents_ = revents;
}bool Channel::IsRegistered() const {return registered_;
}void Channel::SetRegistered(bool reg) {registered_ = reg;
}void Channel::Update() {loop_.UpdateChannel(this);
}
2)EventLoop:事件循环器
EventLoop
是框架的核心执行单元,负责:
执行
epoll_wait
等待事件;分发事件给对应的
Channel::HandleEvent()
;提供线程安全的任务投递(
RunInLoop
,QueueInLoop
);管理跨线程唤醒机制(基于
eventfd
);控制退出和生命周期。
关键机制:每个线程只能拥有一个 EventLoop
实例,保证其线程安全。
Loop()
:是主循环,不断执行epoll_wait
+ 事件分发;
pending_functors_
:任务队列,允许其他线程投递任务到当前 loop;
Wakeup()
:主线程调用该函数可以唤醒epoll_wait
阻塞,执行新任务。
.h
#define MAX_EVENTS 1024/* 负责事件的等待、分发和调度执行*/
class EventLoop {
public:EventLoop();~EventLoop();void Loop(); // 主循环void Quit(); // 退出循环void RunInLoop(std::function<void()> cb); // 若在当前线程直接执行,否则加入队列void QueueInLoop(std::function<void()> cb); // 加入队列(异步执行)void Wakeup(); // 通过 eventfd 唤醒 epoll_waitvoid UpdateChannel(Channel* ch);void RemoveChannel(Channel* ch);bool IsInLoopThread() const;private:void HandleWakeup(); // eventfd 读事件处理器void DoPendingFunctors(); // 执行任务队列中的回调private:int epoll_fd_;int wakeup_fd_; // eventfdstd::unique_ptr<Channel> wakeup_channel_;std::atomic<bool> looping_;std::atomic<bool> quit_;const pid_t thread_id_;std::mutex mutex_;std::vector<std::function<void()>> pending_functors_; // 任务队列
};
.cpp
namespace {
int CreateEventFd() {int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (fd < 0) {std::cerr << "[EventLoop] Failed to create eventfd: " << strerror(errno) << "\n";abort();}return fd;
}
}EventLoop::EventLoop(): epoll_fd_(::epoll_create1(EPOLL_CLOEXEC)),wakeup_fd_(CreateEventFd()),wakeup_channel_(std::make_unique<Channel>(*this, wakeup_fd_)),looping_(false),quit_(false),thread_id_(::gettid()) {assert(epoll_fd_ >= 0);wakeup_channel_->SetReadCallback([this]() { HandleWakeup(); });wakeup_channel_->EnableReading();
}EventLoop::~EventLoop() {::close(epoll_fd_);::close(wakeup_fd_);
}void EventLoop::Loop() {assert(!looping_);looping_ = true;quit_ = false;struct epoll_event events[MAX_EVENTS];while (!quit_) {int n = ::epoll_wait(epoll_fd_, events, MAX_EVENTS, TimerInstance()->WaitTime());if (n < 0 && errno != EINTR) {std::cerr << "[EventLoop] epoll_wait error: " << strerror(errno) << "\n";break;}for (int i = 0; i < n; ++i) {auto* ch = static_cast<Channel*>(events[i].data.ptr);ch->SetRevents(events[i].events); // 必须设置 reventsch->HandleEvent();}DoPendingFunctors();TimerInstance()->HandleTimeout();}looping_ = false;
}void EventLoop::Quit() {quit_ = true;if (!IsInLoopThread()) {Wakeup();}
}void EventLoop::RunInLoop(std::function<void()> cb) {if (IsInLoopThread()) {cb();} else {QueueInLoop(std::move(cb));}
}void EventLoop::QueueInLoop(std::function<void()> cb) {{std::lock_guard<std::mutex> lock(mutex_);pending_functors_.push_back(std::move(cb));}Wakeup();
}void EventLoop::Wakeup() {uint64_t one = 1;::write(wakeup_fd_, &one, sizeof(one));
}void EventLoop::HandleWakeup() {uint64_t one;::read(wakeup_fd_, &one, sizeof(one));
}void EventLoop::DoPendingFunctors() {std::vector<std::function<void()>> functors;{std::lock_guard<std::mutex> lock(mutex_);functors.swap(pending_functors_);}for (const auto& func : functors) {func();}
}void EventLoop::UpdateChannel(Channel* ch) {struct epoll_event ev{};ev.data.ptr = ch;ev.events = ch->Events();int fd = ch->Fd();if (!ch->IsRegistered()) {int ret = ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);std::cout << "[EventLoop] Register fd=" << fd << " to epoll, ret=" << ret << ", errno=" << errno << "\n";ch->SetRegistered(true);} else {::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);}
}void EventLoop::RemoveChannel(Channel* ch) {int fd = ch->Fd();if (ch->IsRegistered()) {::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);ch->SetRegistered(false);}
}bool EventLoop::IsInLoopThread() const {return thread_id_ == ::gettid();
}
3)TcpServer:服务器管理器
TcpServer
是用户使用的主类,核心职责:
启动监听 socket,创建 Acceptor;
创建线程池
EventLoopThreadPool
,并启动各子线程;每当有新连接,调用
AcceptConnection()
进行accept()
;将新连接的 fd 分发给某个子线程中的
EventLoop
;创建对应的
TcpConn
实例管理连接。
通过组合主 loop 和多个子 loop 实现事件分发与连接隔离。
.h
// 多线程 Reactor 架构下的 Echo Server 示例(使用 TcpConn + EventLoop + 线程池)
class TcpServer {
public:TcpServer(EventLoop* loop, const std::string& ip, uint16_t port, int thread_num);void Start();private:void AcceptConnection();void NewConnection(int fd, EventLoop* loop);private:EventLoop* base_loop_;EventLoopThreadPool thread_pool_;std::unique_ptr<Channel> accept_channel_;int listen_fd_;std::string ip_;uint16_t port_;std::map<int, std::shared_ptr<TcpConn>> conns_;
};
.cpp
TcpServer::TcpServer(EventLoop* loop, const std::string& ip, uint16_t port, int thread_num): base_loop_(loop), thread_pool_(loop, thread_num), ip_(ip), port_(port) {}void TcpServer::Start() {thread_pool_.Start();// 创建监听 socketlisten_fd_ = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);int opt = 1;setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_port = htons(port_);inet_pton(AF_INET, ip_.c_str(), &addr.sin_addr);::bind(listen_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));::listen(listen_fd_, SOMAXCONN);accept_channel_ = std::make_unique<Channel>(*base_loop_, listen_fd_);accept_channel_->SetReadCallback([this]() { AcceptConnection(); });accept_channel_->EnableReading();std::cout << "[TcpServer] Listening on " << ip_ << ":" << port_ << "\n";
}void TcpServer::AcceptConnection() {while (true) {sockaddr_in cli_addr;socklen_t len = sizeof(cli_addr);int conn_fd = ::accept4(listen_fd_, reinterpret_cast<sockaddr*>(&cli_addr), &len, SOCK_NONBLOCK);if (conn_fd < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) {break;} else {std::cerr << "[TcpServer] accept4 failed: " << strerror(errno) << "\n";break;}}EventLoop* sub_loop = thread_pool_.GetNextLoop();std::cout << "[TcpServer] Dispatching fd=" << conn_fd << " to sub loop\n";sub_loop->RunInLoop([this, conn_fd, sub_loop]() {std::cout << "[TcpServer] RunInLoop executing NewConnection\n";NewConnection(conn_fd, sub_loop);});}
}void TcpServer::NewConnection(int fd, EventLoop* loop) {auto conn = std::make_shared<TcpConn>(fd, *loop);conn->SetReadCallback([conn]() {std::string msg = conn->GetAllData();if (!msg.empty()) {std::cout << "received: " << msg << std::endl;}TimerInstance()->AddTimeout(1000,[&]() {std::string msg = "wugongzi";conn->Send(msg.data(), msg.size());});});conn->SetCloseCallback([this, fd]() {conns_.erase(fd);std::cout << "[TcpServer] Closed fd=" << fd << "\n";});conns_[fd] = conn;std::cout << "[TcpServer] New connection fd=" << fd << "\n";
}
4)TcpConn:客户端连接对象
TcpConn
封装了一个客户端连接的全部通信逻辑,包括:
与客户端的读写操作;
管理输入输出缓冲区;
设置和执行回调函数(如读、关闭);
持有一个
Channel
对象,注册进EventLoop
。
其设计中使用了 enable_shared_from_this
,保证在回调中获取安全的 shared_ptr
,防止生命周期问题。
关键函数说明:
HandleRead()
:处理读事件,填充ReadBuffer
,触发上层逻辑;
HandleWrite()
:处理写事件,从WriteBuffer
中取数据发送;
Send()
:提供线程安全的异步写入接口;
.h
/**声明 TcpConn 类,同时继承 enable_shared_from_this,方便在回调中安全获取 shared_ptr<TcpConn> 自己的智能指针。*/
class TcpConn : public std::enable_shared_from_this<TcpConn> {
public:// 声明回调函数using ReadCallback = std::function<void()>;using CloseCallback = std::function<void()>;TcpConn(int fd, EventLoop& loop);~TcpConn();// 设置回调函数void SetReadCallback(ReadCallback cb);void SetCloseCallback(CloseCallback cb);// 异步发送数据int Send(const char* data, size_t size);// 获取当前接收缓冲区内的全部数据std::string GetAllData();private:// 内部事件处理函数:读取、写入、关闭连接void HandleRead();void HandleWrite();void Close();private:int fd_;bool closed_;EventLoop& loop_;ReadBuffer input_buffer_; // 读缓冲区WriteBuffer output_buffer_; // 写缓冲区std::shared_ptr<Channel> channel_; // 封装 fd 的 epoll 管理类ReadCallback read_cb_; // 外部注入的回调函数CloseCallback close_cb_;
};
.cpp
TcpConn::TcpConn(int fd, EventLoop& loop): fd_(fd), closed_(false), loop_(loop) {// 创建 Channel 并设置读写回调,注册 EPOLLIN 监听可读事件。channel_ = std::make_shared<Channel>(loop_, fd);channel_->SetReadCallback([this]() { HandleRead(); });channel_->SetWriteCallback([this]() { HandleWrite(); });channel_->EnableReading();
}TcpConn::~TcpConn() {Close();
}void TcpConn::SetReadCallback(ReadCallback cb) {read_cb_ = std::move(cb);
}void TcpConn::SetCloseCallback(CloseCallback cb) {close_cb_ = std::move(cb);
}int TcpConn::Send(const char* data, size_t size) {// 若连接已关闭或无数据,直接返回。if (closed_ || data == nullptr || size == 0) return -1;output_buffer_.Append(reinterpret_cast<const uint8_t *>(data), size);channel_->EnableWriting();return 1;
}std::string TcpConn::GetAllData() {auto data = input_buffer_.GetAllData();if (data.first != nullptr) {// 获取读缓冲区全部有效数据std::string result(reinterpret_cast<char*>(data.first), data.second);// 标记已读的数据input_buffer_.ReadCompleted(data.second);return result;}return "";
}void TcpConn::HandleRead() {int err = 0;// 调用 MessageBuffer::Recv() 使用 readv() 读取数据,读取数据到缓冲区中。int n = input_buffer_.Recv(fd_, &err);if (n > 0 && read_cb_) {read_cb_(); // 触发读取回调逻辑} else if (n == 0 || (n < 0 && err != EAGAIN && err != EWOULDBLOCK)) { // 连接关闭或错误则关闭连接。Close();}
}void TcpConn::HandleWrite() {int err = 0;int n = output_buffer_.Send(fd_, &err);if (n < 0 && err != EAGAIN && err != EWOULDBLOCK) {Close();} else if (output_buffer_.GetPendingSize() == 0) {channel_->DisableWriting();}
}void TcpConn::Close() {if (closed_) return;closed_ = true;channel_->DisableAll();close(fd_);if (close_cb_) close_cb_();
}
5)EventLoopThread:线程封装器
EventLoopThread
封装了一个后台线程和一个与之关联的 EventLoop
。用于在多线程环境中,确保每个线程拥有自己的事件循环。
启动流程:
StartLoop()
启动线程,等待其内部EventLoop
初始化;
ThreadFunc()
中创建EventLoop
实例,并执行loop->Loop()
;提供
InitCallback
,用于线程启动时执行用户自定义初始化逻辑。
确保线程和事件循环生命周期绑定,避免线程逃逸或资源泄露。
.h
class EventLoopThread {
public:using InitCallback = std::function<void(EventLoop*)>;explicit EventLoopThread(InitCallback cb = InitCallback());~EventLoopThread();EventLoop* StartLoop(); // 外部调用启动线程,并获取 loop 指针private:void ThreadFunc(); // 线程函数EventLoop* loop_; // 指向子线程中的 loopstd::thread thread_;std::mutex mutex_;std::condition_variable cond_;bool exiting_;InitCallback callback_;
};
.cpp
EventLoopThread::EventLoopThread(InitCallback cb): loop_(nullptr), exiting_(false), callback_(std::move(cb)) {}EventLoopThread::~EventLoopThread() {exiting_ = true;if (loop_) loop_->Quit(); // 通知 loop 退出if (thread_.joinable()) thread_.join();
}EventLoop* EventLoopThread::StartLoop() {thread_ = std::thread([this]() { ThreadFunc(); });// 等待 loop_ 被初始化std::unique_lock<std::mutex> lock(mutex_);cond_.wait(lock, [this] { return loop_ != nullptr; });return loop_;
}void EventLoopThread::ThreadFunc() {EventLoop loop;if (callback_) callback_(&loop);{std::lock_guard<std::mutex> lock(mutex_);loop_ = &loop;cond_.notify_one();}loop.Loop(); // 子线程进入事件循环loop_ = nullptr;
}
6)EventLoopThreadPool:线程池管理器
EventLoopThreadPool
管理多个 EventLoopThread
,是多线程 Reactor 的核心支持模块。它负责:
初始化并启动多个后台线程;
保存每个线程中的
EventLoop
指针;提供
GetNextLoop()
接口实现轮询调度,用于连接分发。
适配不同的并发场景:可平滑扩展并发处理能力。
若
thread_num == 0
,退化为单 Reactor 单线程模型;否则,主线程负责监听,子线程处理连接读写。
.h
class EventLoopThreadPool {
public:explicit EventLoopThreadPool(EventLoop* base_loop, int num_threads);~EventLoopThreadPool() = default;void Start(); // 创建线程并启动各自的 loopEventLoop* GetNextLoop(); // 获取下一个 EventLoop*private:EventLoop* base_loop_; // 主线程的 loop(用于 0 线程情况)int thread_count_;int next_; // 用于轮询选择 loopstd::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop*> loops_;
};
.cpp
EventLoopThreadPool::EventLoopThreadPool(EventLoop* base_loop, int num_threads): base_loop_(base_loop), thread_count_(num_threads), next_(0) {}void EventLoopThreadPool::Start() {for (int i = 0; i < thread_count_; ++i) {auto thread = std::make_unique<EventLoopThread>();EventLoop* loop = thread->StartLoop();threads_.push_back(std::move(thread));loops_.push_back(loop);}if (thread_count_ == 0) {loops_.push_back(base_loop_);}
}EventLoop* EventLoopThreadPool::GetNextLoop() {if (loops_.empty()) return base_loop_;EventLoop* loop = loops_[next_];next_ = (next_ + 1) % loops_.size(); // 轮询return loop;
}
3、工作流程
✅启动阶段:
在主线程中执行,主要完成服务初始化与监听注册:
执行过程如下:
创建
listen_fd
,设置为非阻塞模式;创建
accept_channel_
,封装listen_fd
;设置其读事件回调为:
TcpServer::HandleAccept()
;将
accept_channel_
注册到主线程的EventLoop
中;启动
EventLoopThreadPool
,每个线程拥有独立的EventLoop
。📌 目标: 主线程负责新连接接入,子线程负责连接 I/O 处理。
✅连接建立阶段:
客户端连接到来 → 内核通知 listen_fd 可读 → 触发 accept 逻辑
执行过程如下:
主线程
epoll_wait()
返回,触发accept_channel_->HandleEvent()
;调用
read_cb_
→TcpServer::HandleAccept()
;调用
accept()
获取conn_fd
;设置
conn_fd
为非阻塞;通过
thread_pool_.GetNextLoop()
选取一个子线程的EventLoop
(如loop1
);向
loop1
提交任务(通过RunInLoop()
):
构造
TcpConn
对象,封装连接生命周期;创建连接的
Channel
并设置其读事件回调为:TcpConn::HandleRead()
;将该
Channel
注册到loop1
的epoll
中。📌 关键点: 每个连接的 I/O 事件都被绑定到一个子线程中独立处理,实现多线程并发。
✅数据收发阶段:
conn_fd 可读 → epoll 通知 → 子线程 loop 触发 TcpConn::HandleRead()
执行过程如下:
EventLoop
中的epoll_wait()
检测到conn_fd
可读;调用
Channel::HandleEvent()
→ 执行TcpConn::HandleRead()
;从
conn_fd
读取数据到input_buffer_
;调用用户注册的
read_cb_
处理数据(如:解析协议、业务逻辑处理);若业务处理耗时,可进一步将任务投递到业务线程池执行;
若需要回应客户端,调用
TcpConn::Send()
。✅数据发送阶段:
TcpConn::Send(data, size)
执行过程如下:
如果当前线程是所属
EventLoop
线程:
直接调用
SendInLoop()
;否则通过
RunInLoop()
投递到正确线程执行;
SendInLoop()
内部逻辑:
如果
fd
可写且output_buffer_
为空,尝试直接调用write()
发送;若未能一次发送完:
将剩余数据写入
output_buffer_
;注册
EPOLLOUT
写事件;设置
channel_->SetWriteCallback()
。