基于libhv实现的TCP Client Server支持同步,异步传输 (C++11)
基于libhv开源库实现的TCP Client & Server sample, TCP Client支持同步和异步数据传输, 同步使用C++11的future特性实现.
libhv介绍
Like libevent, libev, and libuv, libhv provides event-loop with non-blocking IO and timer, but simpler api and richer protocols.
✨ Features
- Cross-platform (Linux, Windows, macOS, Android, iOS, BSD, Solaris)
- High-performance EventLoop (IO, timer, idle, custom, signal)
- TCP/UDP client/server/proxy
- TCP supports heartbeat, reconnect, upstream, MultiThread-safe write and close, etc.
- Built-in common unpacking modes (FixedLength, Delimiter, LengthField)
- RUDP support: WITH_KCP
- SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
- HTTP client/server (support https http1/x http2 grpc)
- HTTP supports static service, indexof service, forward/reverse proxy service, sync/async API handler
- HTTP supports RESTful, router, middleware, keep-alive, chunked, SSE, etc.
- WebSocket client/server
- MQTT client
异步数据传输实现机制
RpcMessage HTcpClient::SendSyncMessage(RpcMessage& rpc_msg,const int64_t timeout) {if (connect_state_ != kConnected) {LOG(ERROR) << "HTcpClient::SendMessage() - not connected!";return RpcMessage();}std::shared_ptr<MessageFuture> message_future =std::make_shared<MessageFuture>();message_future->timeout_ = timeout;message_future->request_ = rpc_msg;rpc_msg.request_id_ = counter_.GetAndIncrement();{std::lock_guard<std::mutex> lock(mutex_);map_.insert(std::make_pair(rpc_msg.request_id_, message_future));}SendMessage(rpc_msg);try {return message_future->Get(timeout);} catch (TimeoutException& e) {LOG(ERROR) << "HTcpClient::SendSyncMessage() - timeout exception, timeout: "<< timeout;// remove the request from the mapstd::lock_guard<std::mutex> lock(mutex_);size_t erased_count = map_.erase(rpc_msg.request_id_);LOG(INFO) << "HTcpClient::SendSyncMessage() - erased_count: "<< erased_count;return RpcMessage();}
}
HTcpClient 类的成员函数 SendSyncMessage,用于同步发送 RPC 消息并等待响应。
首先,函数检查客户端的连接状态 connect_state_ 是否为已连接状态。如果未连接,则记录错误日志并返回一个空的 RpcMessage 对象。
然后,创建一个 MessageFuture 对象,并设置其超时时间和请求消息。接着,使用 counter_.GetAndIncrement() 为请求消息生成一个唯一的请求 ID。
在一个受互斥锁 mutex_ 保护的代码块中,将请求 ID 和 MessageFuture 对象插入到映射 map_ 中。
然后,调用 SendMessage 方法发送请求消息。
在尝试获取响应时,调用 message_future->Get(timeout) 方法。如果在指定的超时时间内未收到响应,则捕获 TimeoutException 并记录错误日志。然后,再次锁住互斥量并从映射中移除对应的请求 ID,记录被移除的条目数量。
最终,函数返回一个空的 RpcMessage 对象,表示请求失败或超时。
MessageFuture Get实现
RpcMessage MessageFuture::Get(int64_t timeout) {std::future<RpcMessage> future = promise_.get_future();std::future_status status =future.wait_for(std::chrono::milliseconds(timeout));if (status == std::future_status::ready) {return future.get();} else if (status == std::future_status::timeout) {// throw timeout exceptionthrow TimeoutException("timeout exception");}
}
MessageFuture 类的成员函数 Get,用于在指定的超时时间内获取 RpcMessage 对象。
首先,函数通过 promise_.get_future() 获取一个 std::future 对象 future。std::promise 和 std::future 是 C++ 标准库中的同步机制,用于在线程间传递结果。promise_ 是一个 std::promise 对象,它的 get_future 方法返回一个与之关联的 std::future 对象。
接下来,函数调用 future.wait_for(std::chrono::milliseconds(timeout)),等待指定的超时时间(以毫秒为单位)。wait_for 方法返回一个 std::future_status 枚举值,表示 future 的状态。可能的状态包括 std::future_status::ready(表示结果已准备好)和 std::future_status::timeout(表示等待超时)。
如果 status 等于 std::future_status::ready,则调用 future.get() 获取 RpcMessage 对象并返回。get 方法会阻塞当前线程,直到结果可用,并返回存储在 future 中的值。
如果 status 等于 std::future_status::timeout,则抛出一个 TimeoutException 异常,表示等待超时。异常消息为 “timeout exception”。
通过这种方式,Get 函数能够在指定的超时时间内等待并获取 RpcMessage 对象,如果超时则抛出异常。
使用事项
static void onConnection(const TcpClient::TSocketChannelPtr& channel) {if (channel->isConnected()) {LOG(INFO) << "connected";RpcMessage msg;msg.command_ = 100;msg.request_id_ = counter.GetAndIncrement();msg.payload_ = "hello";msg.length_ = msg.payload_.size();// client.SendMessage(msg);std::thread([channel, &msg] {LOG(INFO) << "SendSyncMessage Start";RpcMessage ret = client.SendSyncMessage(msg, 10000);LOG(INFO) << "SendSyncMessage END" << " request_id: " << ret.request_id_<< " payload: " << ret.payload_;}).detach();} else {LOG(INFO) << "disconnected";}
}
同步传输
为什么放在另外的线程中处理?
TCPClient自身会有EventLoopThread, 是在此线程中运行, 而SendAsyncMessage会阻塞此线程, 因此具体数据的处理都放在其它线程中处理; 类似于Linux驱动开发中的中断处理, 分为上半部和下半部.
Reference
基于libhv实现的TCP client和TCP server,支持同步和异步, 帮我点点Star
TCP Client & Server Sample