当前位置: 首页 > news >正文

C++服务器 支持http、tcp protobuf、websocket,linux开源框架 零依赖轻松编译部署 Reactor

开源地址: https://github.com/crust-hub/tubekit/tree/main
Github:https://github.com/gaowanlu
诚招有兴趣的小伙伴加入开发维护

Tubekit

The C++ TCP server framework based on the Reactor model continues to implement POSIX thread pool, Epoll, non blocking IO, object pool, log, socket network programming, support the dynamic library to implement custom protocol extensions, and use http parser to process http requests. Currently only supports Linux systems

Platform: Linux
Protocol: HTTP TCP Stream(Protobuf) WebSocket

Get Start

prepare

$ sudo apt update
$ sudo apt install protobuf-compiler libprotobuf-dev
$ apt install g++ cmake make
$ git clone https://github.com/crust-hub/tubekit.git

Build

$ cd tubekit
$ cd protocol
$ make
$ cd ..
$ cmake .
$ make -j3

Config

$ vim bin/config/main.ini

Run

$ chmod +x ./run.sh
$ ./run.sh

Stop

$ chmod +x ./kill.sh
$ ./kill.sh

App

support tcp keep-alive stream (protobuf) and http app (http-parser)、websocket

Directory Structure

Directory Structure Link

Third Party

@http-parser
@lua

HTTP样例

#include "app/http_app.h"
#include <string>
#include <vector>
#include <filesystem>
#include <tubekit-log/logger.h>#include "utility/mime_type.h"
#include "utility/url.h"using std::string;
using std::vector;
using tubekit::app::http_app;
using tubekit::connection::http_connection;
namespace fs = std::filesystem;
namespace utility = tubekit::utility;class html_loader
{
public:static string load(string body){static string frame1 = "<!DOCTYPE html>\<html>\<head>\<title></title>\</head>\<body>";static string frame2 = "</body>\</html>";return frame1 + body + frame2;}static string a_tag(string url, string text){string frame = "<a href=\"" + url + "\">" + text + "</a></br>";return frame;}
};void http_app::process_connection(tubekit::connection::http_connection &m_http_connection)
{m_http_connection.m_buffer.set_limit_max(202300);// load callbackm_http_connection.destory_callback = [](http_connection &m_connection) -> void{if (m_connection.ptr){FILE *file = (FILE *)m_connection.ptr;::fclose(file);m_connection.ptr = nullptr;}};m_http_connection.process_callback = [](http_connection &connection) -> void{string url = utility::url::decode(connection.url);auto find_res = url.find("..");if (std::string::npos != find_res){connection.set_response_end(true);return;}const string prefix = "/";fs::path t_path;if (url.empty() || url[0] != '/'){t_path = prefix + url;}else{t_path = url;}if (fs::exists(t_path) && fs::status(t_path).type() == fs::file_type::regular){std::string mime_type;try{mime_type = utility::mime_type::get_type(t_path.string());}catch (...){mime_type = "application/octet-stream";}std::string response = "HTTP/1.1 200 OK\r\nServer: tubekit\r\n";response += "Content-Type: ";response += mime_type + "\r\n\r\n";try{connection.m_buffer.write(response.c_str(), response.size());}catch (const std::runtime_error &e){LOG_ERROR(e.what());}connection.ptr = nullptr;connection.ptr = ::fopen(t_path.c_str(), "r");if (connection.ptr == nullptr){connection.set_response_end(true);return;}// Write when the contents of the buffer have been sent write_end_callback will be executed,// and the response must be set response_end to true, then write after write_end_callback will be continuously recalledconnection.write_end_callback = [](http_connection &m_connection) -> void{char buf[202300] = {0};int len = 0;len = ::fread(buf, sizeof(char), 202300, (FILE *)m_connection.ptr);if (len > 0){try{m_connection.m_buffer.write(buf, len);}catch (const std::runtime_error &e){LOG_ERROR(e.what());}}else{m_connection.set_response_end(true);}};return;}if (fs::exists(t_path) && fs::status(t_path).type() == fs::file_type::directory){connection.ptr = nullptr;const char *response = "HTTP/1.1 200 OK\r\nServer: tubekit\r\nContent-Type: text/html; charset=UTF-8\r\n\r\n";try{connection.m_buffer.write(response, strlen(response));}catch (const std::runtime_error &e){LOG_ERROR(e.what());}//  generate dir listvector<string> a_tags;for (const auto &dir_entry : fs::directory_iterator{t_path}){std::string sub_path = dir_entry.path().string(); //.substr(prefix.size());a_tags.push_back(html_loader::a_tag(utility::url::encode(sub_path), sub_path));}string body;for (const auto &a_tag : a_tags){body += a_tag;}string html = html_loader::load(body);try{connection.m_buffer.write(html.c_str(), html.size());}catch (const std::runtime_error &e){LOG_ERROR(e.what());}connection.set_response_end(true);return;}const char *response = "HTTP/1.1 404 Not Found\r\nServer: tubekit\r\nContent-Type: text/text; charset=UTF-8\r\n\r\n";try{connection.m_buffer.write(response, strlen(response));}catch (const std::runtime_error &e){LOG_ERROR(e.what());}connection.set_response_end(true);};
}

protobuf样例

#include "app/stream_app.h"
#include "proto_res/proto_cmd.pb.h"
#include "proto_res/proto_example.pb.h"
#include "proto_res/proto_message_head.pb.h"
#include <tubekit-log/logger.h>
#include <string>
#include <set>
#include "thread/mutex.h"
#include "utility/singleton.h"
#include "connection/connection_mgr.h"
#include "socket/socket.h"
#include "socket/socket_handler.h"using tubekit::app::stream_app;
using tubekit::connection::connection_mgr;
using tubekit::connection::stream_connection;
using tubekit::socket::socket;
using tubekit::socket::socket_handler;
using tubekit::utility::singleton;namespace tubekit::app
{std::set<void *> global_player;tubekit::thread::mutex global_player_mutex;
}int process_protocol(tubekit::connection::stream_connection &m_stream_connection, ProtoPackage &package)
{// EXAMPLE_REQif (package.cmd() == ProtoCmd::EXAMPLE_REQ){ProtoExampleReq exampleReq;if (exampleReq.ParseFromString(package.body())){LOG_ERROR("%s", exampleReq.testcontext().c_str());// std::cout << exampleReq.testcontext() << std::endl;}else{return -1;}return 0;}return -1;
}void stream_app::process_connection(tubekit::connection::stream_connection &m_stream_connection)
{using tubekit::app::global_player;using tubekit::app::global_player_mutex;uint64_t all_data_len = m_stream_connection.m_recv_buffer.can_readable_size();char *all_data_buffer = new char[all_data_len];m_stream_connection.m_recv_buffer.copy_all(all_data_buffer, all_data_len);uint64_t offset = 0;do{char *tmp_buffer = all_data_buffer + offset;uint64_t data_len = all_data_len - offset;if (data_len == 0){break;}ProtoPackage protoPackage;if (!protoPackage.ParseFromArray(tmp_buffer, data_len)){// std::cout << "protoPackage.ParseFromArray failed" << std::endl;break;}if (0 != process_protocol(m_stream_connection, protoPackage)){// std::cout << "process_protocol failed" << std::endl;m_stream_connection.mark_close();m_stream_connection.m_recv_buffer.clear();break;}// std::cout << "datalen " << data_len << " package size " << protoPackage.ByteSizeLong() << std::endl;offset += protoPackage.ByteSizeLong();} while (true);if (!m_stream_connection.m_recv_buffer.read_ptr_move_n(offset)){m_stream_connection.mark_close();}delete[] all_data_buffer;
}void stream_app::on_close_connection(tubekit::connection::stream_connection &m_stream_connection)
{using tubekit::app::global_player;using tubekit::app::global_player_mutex;global_player_mutex.lock();global_player.erase(m_stream_connection.get_socket_ptr());LOG_ERROR("player online %d", global_player.size());global_player_mutex.unlock();
}void stream_app::on_new_connection(tubekit::connection::stream_connection &m_stream_connection)
{using tubekit::app::global_player;using tubekit::app::global_player_mutex;global_player_mutex.lock();global_player.insert(m_stream_connection.get_socket_ptr());LOG_ERROR("player online %d", global_player.size());global_player_mutex.unlock();
}bool stream_app::new_client_connection(const std::string &ip, int port)
{socket::socket *socket_object = singleton<socket_handler>::instance()->alloc_socket();if (!socket_object){LOG_ERROR("alloc_socket return nullptr");return false;}bool b_ret = socket_object->connect(ip, port);if (!b_ret){LOG_ERROR("connection remote %s:%d failed", ip.c_str(), port);singleton<socket_handler>::instance()->remove(socket_object);return false;}int i_ret = singleton<socket_handler>::instance()->attach(socket_object);if (0 != i_ret){LOG_ERROR("attach to socket_handler error ret %d", i_ret);singleton<socket_handler>::instance()->remove(socket_object);return false;}// maybe to do some management for client socket...return true;
}

websocket

#include "app/websocket_app.h"
#include <vector>
#include <tubekit-log/logger.h>
#include "utility/singleton.h"
#include "connection/connection_mgr.h"
#include <arpa/inet.h>using namespace tubekit::app;
using namespace tubekit::utility;
using namespace tubekit::connection;struct websocket_frame
{uint8_t fin;uint8_t opcode;uint8_t mask;uint64_t payload_length;std::vector<uint8_t> masking_key;std::string payload_data;
};enum class websocket_frame_type
{CONNECTION_CLOSE_FRAME = 0,TEXT_FRAME = 1,BINARY_FRAME = 2,PONG = 3,PING = 4,CONTINUATION_FRAME = 5,ERROR = 6
};void websocket_app::process_connection(tubekit::connection::websocket_connection &m_websocket_connection)
{LOG_ERROR("process_connection");uint64_t all_data_len = m_websocket_connection.m_recv_buffer.can_readable_size();if (all_data_len <= 0){LOG_ERROR("all_data_len <= 0");return;}char *data = new (std::nothrow) char[all_data_len];if (!data){return;}all_data_len = m_websocket_connection.m_recv_buffer.copy_all(data, all_data_len);size_t index = 0;while (true){if (index >= all_data_len){break;}size_t start_index = index;websocket_frame frame;websocket_frame_type type = websocket_frame_type::ERROR;switch ((uint8_t)data[index]){case 0x81:{type = websocket_frame_type::TEXT_FRAME;break;}case 0x82:{type = websocket_frame_type::BINARY_FRAME;break;}case 0x88:{type = websocket_frame_type::CONNECTION_CLOSE_FRAME;break;}case 0x89:{type = websocket_frame_type::PING;break;}default:{if (data[index] >= 0x00 && data[index] <= 0x7F){type = websocket_frame_type::CONTINUATION_FRAME;}break;}}if (type != websocket_frame_type::TEXT_FRAME && type != websocket_frame_type::BINARY_FRAME){m_websocket_connection.mark_close();break;}frame.fin = (data[index] & 0x80) != 0;frame.opcode = data[index] & 0x0F;index++;if (index >= all_data_len){LOG_ERROR("index[%llu] >= all_data_len[%llu]", index, all_data_len);break;}frame.mask = (data[index] & 0x80) != 0;frame.payload_length = data[index] & 0x7F;index++;if (index >= all_data_len){LOG_ERROR("index[%llu] >= all_data_len[%llu]", index, all_data_len);break;}if (frame.payload_length == 126){frame.payload_length = 0;if (index + 2 >= all_data_len){LOG_ERROR("index[%llu] >= all_data_len[%llu]", index + 2, all_data_len);break;}uint16_t tmp = 0;u_char *ph;ph = (u_char *)&tmp;*ph++ = data[index];*ph++ = data[index + 1];tmp = ntohs(tmp);frame.payload_length = tmp;index += 2;}else if (frame.payload_length == 127){frame.payload_length = 0;if (index + 8 >= all_data_len){LOG_ERROR("index[%llu] >= all_data_len[%llu]", index + 8, all_data_len);break;}uint32_t tmp = 0;u_char *ph = (u_char *)&tmp;*ph++ = data[index++];*ph++ = data[index++];*ph++ = data[index++];*ph++ = data[index++];frame.payload_length = ntohl(tmp);frame.payload_length = frame.payload_length << 32;ph = (u_char *)&tmp;*ph++ = data[index++];*ph++ = data[index++];*ph++ = data[index++];*ph++ = data[index++];tmp = ntohl(tmp);frame.payload_length = frame.payload_length | tmp;}if (frame.payload_length == 0){break;}if (frame.mask){if (index + 4 >= all_data_len){LOG_ERROR("index[%llu] >= all_data_len[%llu]", index + 3, all_data_len);break;}frame.masking_key = {(uint8_t)data[index], (uint8_t)data[index + 1], (uint8_t)data[index + 2], (uint8_t)data[index + 3]};index += 4;}// payload data [data+index,data+index+frame.payload_length]if (index >= all_data_len){LOG_ERROR("index[%llu] >= all_data_len[%llu]", index, all_data_len);break;}if (index - 1 + frame.payload_length >= all_data_len){LOG_ERROR("index - 1 + frame.payload_length=[%llu] >= all_data_len[%llu]", index - 1 + frame.payload_length, all_data_len);break;}std::string payload_data(data + index, frame.payload_length);if (frame.mask){for (size_t i = 0; i < payload_data.size(); ++i){payload_data[i] ^= frame.masking_key[i % 4];}}frame.payload_data = std::move(payload_data);// broadcastsingleton<connection_mgr>::instance()->for_each([&frame](connection::connection &conn) -> void{websocket_connection *ptr_conn = static_cast<websocket_connection *>(&conn);websocket_app::send_packet(*ptr_conn, frame.payload_data.c_str(), frame.payload_length, false);});// websocket_app::send_packet(m_websocket_connection, frame.payload_data.c_str(), frame.payload_length, false);// frame.payload_data.push_back(0);// LOG_ERROR("%s", frame.payload_data.c_str());m_websocket_connection.m_recv_buffer.read_ptr_move_n(index - start_index + frame.payload_length);index += frame.payload_length;}delete[] data;
}void websocket_app::on_close_connection(tubekit::connection::websocket_connection &m_websocket_connection)
{LOG_ERROR("on_close_connection");
}void websocket_app::on_new_connection(tubekit::connection::websocket_connection &m_websocket_connection)
{LOG_ERROR("on_new_connection");
}bool websocket_app::send_packet(tubekit::connection::websocket_connection &m_websocket_connection, const char *data, size_t data_len, bool use_safe)
{if (!data){return false;}uint8_t opcode = 0x81;size_t message_length = data_len;std::vector<uint8_t> frame;frame.push_back(opcode);if (message_length <= 125){frame.push_back(static_cast<uint8_t>(message_length));}else if (message_length <= 0xFFFF){frame.push_back(126);frame.push_back((message_length >> 8) & 0xFF);frame.push_back(message_length & 0xFF);}else{frame.push_back(127);for (int i = 7; i >= 0; --i){frame.push_back((message_length >> (8 * i)) & 0xFF);}}frame.insert(frame.end(), data, data + data_len);if (!use_safe){return m_websocket_connection.send((const char *)frame.data(), frame.size());}return singleton<connection_mgr>::instance()->safe_send(m_websocket_connection.get_socket_ptr(), (const char *)frame.data(), frame.size());
}
http://www.lryc.cn/news/251785.html

相关文章:

  • 1688API接口系列,1688开放平台接口使用方案(商品详情数据+搜索商品列表+商家订单类)
  • CentOS服务器网页版Rstudio-server及R包批量安装最佳实践
  • centos7内核升级(k8s基础篇)
  • 数据结构与算法设计分析——NP完全理论
  • AGNES层次聚类
  • HCIP —— 双点重发布 + 路由策略 实验
  • Python标准库:datetime模块【侯小啾python领航班系列(二十五)】
  • 新版idea如何开启多台JVM虚拟机
  • 软件工程单选多选补充
  • 6-66.时间
  • 面试多线程八股文十问十答第一期
  • Mybatis 操作续集(结合上文)
  • JVM基础篇:垃圾回收
  • 蓝桥杯ACwing习题
  • vue发送请求携带token,拼接url地址下载文件
  • 【PTA-C语言】编程练习3 - 循环结构Ⅱ
  • Google Chrome 下载 (离线版)
  • 2023年GopherChina大会-核心PPT资料下载
  • 从源代码出发,Jenkins 任务排队时间过长问题的解决过程
  • openssl 生成CA及相关证书
  • App测试之App日志收集及adb常用命令
  • 【Java面试——并发基础、并发关键字】
  • 如何使用 Java 在Excel中创建下拉列表
  • Python安装步骤介绍
  • 学习80min快速了解大型语言模型(ChatGPT使用)笔记
  • SQL错题集1
  • uniapp运行到安卓基座app/img标签不显示
  • vscode非常好用的扩展插件
  • 一文弄懂BFS【广度优先搜索(Breadth-First Search)】
  • 深度学习记录--logistic回归函数的计算图