当前位置: 首页 > news >正文

基于libhv实现的TCP Client Server支持同步,异步传输 (C++11)

基于libhv开源库实现的TCP Client & Server sample, TCP Client支持同步和异步数据传输, 同步使用C++11的future特性实现.

libhv介绍

Like libevent, libev, and libuv, libhv provides event-loop with non-blocking IO and timer, but simpler api and richer protocols.

✨ Features

  • Cross-platform (Linux, Windows, macOS, Android, iOS, BSD, Solaris)
  • High-performance EventLoop (IO, timer, idle, custom, signal)
  • TCP/UDP client/server/proxy
  • TCP supports heartbeat, reconnect, upstream, MultiThread-safe write and close, etc.
  • Built-in common unpacking modes (FixedLength, Delimiter, LengthField)
  • RUDP support: WITH_KCP
  • SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
  • HTTP client/server (support https http1/x http2 grpc)
  • HTTP supports static service, indexof service, forward/reverse proxy service, sync/async API handler
  • HTTP supports RESTful, router, middleware, keep-alive, chunked, SSE, etc.
  • WebSocket client/server
  • MQTT client

异步数据传输实现机制

RpcMessage HTcpClient::SendSyncMessage(RpcMessage& rpc_msg,const int64_t timeout) {if (connect_state_ != kConnected) {LOG(ERROR) << "HTcpClient::SendMessage() - not connected!";return RpcMessage();}std::shared_ptr<MessageFuture> message_future =std::make_shared<MessageFuture>();message_future->timeout_ = timeout;message_future->request_ = rpc_msg;rpc_msg.request_id_ = counter_.GetAndIncrement();{std::lock_guard<std::mutex> lock(mutex_);map_.insert(std::make_pair(rpc_msg.request_id_, message_future));}SendMessage(rpc_msg);try {return message_future->Get(timeout);} catch (TimeoutException& e) {LOG(ERROR) << "HTcpClient::SendSyncMessage() - timeout exception, timeout: "<< timeout;// remove the request from the mapstd::lock_guard<std::mutex> lock(mutex_);size_t erased_count = map_.erase(rpc_msg.request_id_);LOG(INFO) << "HTcpClient::SendSyncMessage() - erased_count: "<< erased_count;return RpcMessage();}
}

HTcpClient 类的成员函数 SendSyncMessage,用于同步发送 RPC 消息并等待响应。
首先,函数检查客户端的连接状态 connect_state_ 是否为已连接状态。如果未连接,则记录错误日志并返回一个空的 RpcMessage 对象。
然后,创建一个 MessageFuture 对象,并设置其超时时间和请求消息。接着,使用 counter_.GetAndIncrement() 为请求消息生成一个唯一的请求 ID。
在一个受互斥锁 mutex_ 保护的代码块中,将请求 ID 和 MessageFuture 对象插入到映射 map_ 中。
然后,调用 SendMessage 方法发送请求消息。
在尝试获取响应时,调用 message_future->Get(timeout) 方法。如果在指定的超时时间内未收到响应,则捕获 TimeoutException 并记录错误日志。然后,再次锁住互斥量并从映射中移除对应的请求 ID,记录被移除的条目数量。
最终,函数返回一个空的 RpcMessage 对象,表示请求失败或超时。

MessageFuture Get实现

RpcMessage MessageFuture::Get(int64_t timeout) {std::future<RpcMessage> future = promise_.get_future();std::future_status status =future.wait_for(std::chrono::milliseconds(timeout));if (status == std::future_status::ready) {return future.get();} else if (status == std::future_status::timeout) {// throw timeout exceptionthrow TimeoutException("timeout exception");}
}

MessageFuture 类的成员函数 Get,用于在指定的超时时间内获取 RpcMessage 对象。
首先,函数通过 promise_.get_future() 获取一个 std::future 对象 future。std::promise 和 std::future 是 C++ 标准库中的同步机制,用于在线程间传递结果。promise_ 是一个 std::promise 对象,它的 get_future 方法返回一个与之关联的 std::future 对象。
接下来,函数调用 future.wait_for(std::chrono::milliseconds(timeout)),等待指定的超时时间(以毫秒为单位)。wait_for 方法返回一个 std::future_status 枚举值,表示 future 的状态。可能的状态包括 std::future_status::ready(表示结果已准备好)和 std::future_status::timeout(表示等待超时)。
如果 status 等于 std::future_status::ready,则调用 future.get() 获取 RpcMessage 对象并返回。get 方法会阻塞当前线程,直到结果可用,并返回存储在 future 中的值。
如果 status 等于 std::future_status::timeout,则抛出一个 TimeoutException 异常,表示等待超时。异常消息为 “timeout exception”。
通过这种方式,Get 函数能够在指定的超时时间内等待并获取 RpcMessage 对象,如果超时则抛出异常。

使用事项

static void onConnection(const TcpClient::TSocketChannelPtr& channel) {if (channel->isConnected()) {LOG(INFO) << "connected";RpcMessage msg;msg.command_ = 100;msg.request_id_ = counter.GetAndIncrement();msg.payload_ = "hello";msg.length_ = msg.payload_.size();// client.SendMessage(msg);std::thread([channel, &msg] {LOG(INFO) << "SendSyncMessage Start";RpcMessage ret = client.SendSyncMessage(msg, 10000);LOG(INFO) << "SendSyncMessage END" << " request_id: " << ret.request_id_<< " payload: " << ret.payload_;}).detach();} else {LOG(INFO) << "disconnected";}
}

同步传输

为什么放在另外的线程中处理?

TCPClient自身会有EventLoopThread, 是在此线程中运行, 而SendAsyncMessage会阻塞此线程, 因此具体数据的处理都放在其它线程中处理; 类似于Linux驱动开发中的中断处理, 分为上半部和下半部.

Reference

基于libhv实现的TCP client和TCP server,支持同步和异步, 帮我点点Star

TCP Client & Server Sample

http://www.lryc.cn/news/599654.html

相关文章:

  • QT开发技术【串口和C++20协程,实现循环发送、暂停、恢复、停止】
  • 上位机知识篇---Jetson Nano的深度学习GPU推理
  • TCP模型,mqtt协议01 day41
  • 【算法-图论】图的存储
  • 嵌入式——C语言:指针①
  • Web攻防-业务逻辑篇密码找回重定向目标响应包检验流程跳过回显泄露验证枚举
  • Go 官方 Elasticsearch 客户端 v9 快速上手与进阶实践*
  • 深度学习day02--神经网络(前三节)
  • 安装本地python文件到site-packages
  • STM32基础知识学习笔记:ICODE、DCODE、DMA等常见名词的解释
  • 【C++详解】模板进阶 非类型模板参数,函数模板特化,类模板全特化、偏特化,模板分离编译
  • 在 .NET 中使用 Base64 时容易踩的坑总结
  • vscode npm run build打包报ELIFECYCLE
  • Linux进程深度解析(2):fork/exec写时拷贝性能优化与exit资源回收机制(进程创建和销毁)
  • 嵌入式学习的第三十五天-进程间通信-HTTP
  • 【论文阅读51】-CNN-LSTM-安全系数和失效概率预测
  • FalconFS: Distributed File System for Large-Scale Deep Learning Pipeline——论文阅读
  • Multiscale Structure Guided Diffusion for Image Deblurring 论文阅读
  • ZYNQ芯片,SPI驱动开发自学全解析个人笔记【FPGA】【赛灵思
  • 秋招Day19 - 分布式 - 理论
  • 【论文阅读】-《GenAttack: Practical Black-box Attacks with Gradient-Free Optimization》
  • 在 Azure 中配置 SMS 与 OTP
  • 高效算法的实现与优化是计算机科学的核心,直接决定了程序的性能和资源消耗。下面针对排序算法、搜索算法和动态规划,深入探讨其高效实现与关键优化技术。
  • 如何用js自动触发deepseek输入并发送,不调用api的情况
  • Java 流(Stream)分类、用途与性能分析
  • 【Web】DASCTF 2025上半年赛 wp
  • 非参数密度函数估计(1)
  • 职坐标解析:人工智能产业现状
  • C#(数据类型)
  • Swagger详解API 文档