当前位置: 首页 > news >正文

linux C++监听管道文件方式

方式一(传统读取文件,一直监听循环读取文件)

非阻塞打开文件,用read循环定时读取,性能不好

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>constexpr int kBufferSize = 256;class FileDescriptor {
public:explicit FileDescriptor(int fd) : fd_(fd) {}~FileDescriptor() {if (fd_ >= 0) {::close(fd_);}}int get() const {return fd_;}private:int fd_;
};class PipeListener {
public:using DataCallback = std::function<void(std::string&&)>;PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {if (access(pipePath.c_str(), F_OK) == -1) {if (mkfifo(pipePath.c_str(), 0666) == -1) {std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;}}}~PipeListener() {stopListening();}void setDataCallback(DataCallback callback) {dataCallback_ = std::move(callback);}void startListening() {stopListening_ = false;listeningThread_ = std::thread(&PipeListener::listenThread, this);std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;}void stopListening() {stopListening_ = true;if (listeningThread_.joinable()) {listeningThread_.join();std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;}}private:void listenThread() {auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));if (fd->get() < 0) {std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;return;}char buffer[kBufferSize];while (!stopListening_) {ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));if (bytesRead > 0) {std::string data(buffer, bytesRead);if (!data.empty() && dataCallback_) {dataCallback_(std::move(data));}}std::this_thread::sleep_for(std::chrono::milliseconds(100));}}private:std::string pipePath_;DataCallback dataCallback_;std::thread listeningThread_;bool stopListening_;
};int main() {PipeListener pipeListener("/home/hello/tmp/test_pipe"); // 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发pipeListener.setDataCallback([](std::string&& data) {std::cout << "Received size: " << data.size() << std::endl;std::cout << "Received data: " << data << std::endl;std::cout << "Received data (hex): ";for (char c : data) {std::cout << std::hex << (int)(unsigned char)c << " ";}std::cout << std::dec << std::endl;});pipeListener.startListening();std::cout << "main" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(100));pipeListener.stopListening();return 0;
}

方式二(用epoll监听管道文件内容)

但是要停止程序时会阻塞在epoll_wait,等待管道消息之后才能正常退出

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <cstring> constexpr int kBufferSize = 256;class FileDescriptor {
public:explicit FileDescriptor(int fd) : fd_(fd) {}~FileDescriptor() {if (fd_ >= 0) {std::cout << "close fd_ " << std::endl;::close(fd_);}}int get() const {return fd_;}private:int fd_;
};class PipeListener {
public:using DataCallback = std::function<void(std::string&&)>;PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(false) {if (access(pipePath.c_str(), F_OK) == -1) {if (mkfifo(pipePath.c_str(), 0666) == -1) {std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;}}}~PipeListener() {stopListening();std::cout << "close epollfd_ " << std::endl;close(epollfd_);}void setDataCallback(DataCallback callback) {dataCallback_ = std::move(callback);}void startListening() {stopListening_ = false;listeningThread_ = std::thread(&PipeListener::listenThread, this);std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;}void stopListening() {stopListening_ = true;if (listeningThread_.joinable()) {std::cout << " wait Stopped listening on pipe: " << pipePath_ << std::endl;listeningThread_.join();std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;}}private:void listenThread() {auto fd = std::make_unique<FileDescriptor>(::open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK));if (fd->get() < 0) {std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;return;}int epollfd_ = epoll_create(1);if (epollfd_ == -1) {std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;return;}struct epoll_event event;event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered modeevent.data.fd = fd->get();if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, fd->get(), &event) == -1) {std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;return;}char buffer[kBufferSize];while (!stopListening_) {struct epoll_event events[1];int nfds = epoll_wait(epollfd_, events, 1, -1);if (nfds == -1) {std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;break;}for (int i = 0; i < nfds; ++i) {if (events[i].events & EPOLLIN) {ssize_t bytesRead = ::read(fd->get(), buffer, sizeof(buffer));if (bytesRead > 0) {std::string data(buffer, bytesRead);if (!data.empty() && dataCallback_) {dataCallback_(std::move(data));}}}}}}private:std::string pipePath_;DataCallback dataCallback_;std::thread listeningThread_;bool stopListening_;int epollfd_;  // 新增的成员变量用于保存 epollfd_
};int main() {PipeListener pipeListener("/home/hello/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/hello/tmp/test_pipe 命令就能触发pipeListener.setDataCallback([](std::string&& data) {std::cout << "Received size: " << data.size() << std::endl;std::cout << "Received data: " << data << std::endl;std::cout << "Received data (hex): ";for (char c : data) {std::cout << std::hex << (int)(unsigned char)c << " ";}std::cout << std::dec << std::endl;});pipeListener.startListening();std::cout << "main" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));// pipeListener.stopListening();std::cout << "main return" << std::endl;return 0;
}

方式三(用epoll监听管道文件内容)

用epoll监听管道文件内容,对象析构或者退出时会通过epoll唤醒epoll_wait,释放资源正常退出

代码如下:

#include <iostream>
#include <fstream>
#include <functional>
#include <thread>
#include <chrono>
#include <csignal>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <condition_variable>
#include <sys/eventfd.h>
#include <cstring>  constexpr int kBufferSize = 256;class PipeListener {
public:using DataCallback = std::function<void(std::string&&)>;PipeListener(const std::string& pipePath) : pipePath_(pipePath), stopListening_(true), epollfd_(0) {if (access(pipePath.c_str(), F_OK) == -1) {if (mkfifo(pipePath.c_str(), 0666) == -1) {std::cerr << "Failed to create the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;}}}~PipeListener() {stopListening();}void setDataCallback(DataCallback callback) {dataCallback_ = std::move(callback);}void startListening() {if (stopListening_ == false) {std::cout << "already start Listening " << std::endl;return;}stopListening_ = false;pipe_fd_ = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK);std::cout << "startListening pipe_fd_ ["<< pipe_fd_ <<"] " << std::endl;if (!pipe_fd_) {std::cerr << "Failed to open the pipe file: " << pipePath_ << ". Error: " << strerror(errno) << std::endl;return;}epollfd_ = epoll_create(1);std::cout << "startListening epollfd_ ["<< epollfd_ <<"] " << std::endl;if (epollfd_ == -1) {std::cerr << "Failed to create epoll instance. Error: " << strerror(errno) << std::endl;return;}struct epoll_event event;event.events = EPOLLIN | EPOLLET;  // Enable edge-triggered modeevent.data.fd = pipe_fd_;if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, pipe_fd_, &event) == -1) {std::cerr << "Failed to add file descriptor to epoll. Error: " << strerror(errno) << std::endl;return;}// 创建用于通知的 eventfd,监听要求停止监听管道文件事件,方便安全释放资源退出程序eventfd_ = eventfd(0, EFD_NONBLOCK);std::cout << "startListening eventfd_ ["<< eventfd_ <<"] " << std::endl;if (eventfd_ == -1) {std::cerr << "Failed to create eventfd. Error: " << strerror(errno) << std::endl;return;}event.events = EPOLLIN | EPOLLET;event.data.fd = eventfd_;if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, eventfd_, &event) == -1) {std::cerr << "Failed to add eventfd to epoll. Error: " << strerror(errno) << std::endl;return;}listeningThread_ = std::thread(&PipeListener::listenThread, this);std::cout << "Listening for data on pipe: " << pipePath_ << std::endl;}void stopListening() {if (stopListening_) {std::cout << "already stop Listening " << std::endl;return;}stopListening_ = true;// 写入一个字节到 eventfd 唤醒 epoll_wait,等待安全退出uint64_t value = 1;write(eventfd_, &value, sizeof(value));if (listeningThread_.joinable()) {listeningThread_.join();std::cout << "Stopped listening on pipe: " << pipePath_ << std::endl;}// 从 epoll 实例中删除文件描述符epoll_event event;event.data.fd = pipe_fd_;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);event.data.fd = eventfd_;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd_, EPOLL_CTL_DEL, pipe_fd_, &event);close(pipe_fd_);close(eventfd_);close(epollfd_);std::cout << "pipe_fd_ ["<< pipe_fd_ <<"] close " << std::endl;std::cout << "epollfd_ ["<< epollfd_ <<"] close " << std::endl;std::cout << "eventfd_ ["<< eventfd_ <<"] close " << std::endl;}private:void listenThread() {std::cout << "listenThread start " << pipePath_ << std::endl;char buffer[kBufferSize];while (true) {struct epoll_event events[2];int nfds = epoll_wait(epollfd_, events, 2, -1);if (stopListening_) {break;}if (nfds == -1) {std::cerr << "epoll_wait failed. Error: " << strerror(errno) << std::endl;}for (int i = 0; i < nfds; ++i) {if (events[i].data.fd == pipe_fd_ && (events[i].events & EPOLLIN)) {ssize_t bytesRead = ::read(pipe_fd_, buffer, sizeof(buffer));if (bytesRead > 0) {std::string data(buffer, bytesRead);if (!data.empty() && dataCallback_) {dataCallback_(std::move(data));}}} else if (events[i].data.fd == eventfd_ && (events[i].events & EPOLLIN)) {// 读取 eventfd,清空它uint64_t value;read(eventfd_, &value, sizeof(value));}}}std::cout << "listenThread exit " << pipePath_ << std::endl;}private:std::string pipePath_;DataCallback dataCallback_;std::thread listeningThread_;bool stopListening_ = true; // 默认状态停止的int epollfd_;int eventfd_;int pipe_fd_;
};int main() {PipeListener pipeListener("/home/woan/tmp/test_pipe");// 测试方式,启动程序之后,在终端用 echo "bt_upgrade" > /home/woan/tmp/test_pipe 命令就能触发pipeListener.setDataCallback([](std::string&& data) {std::cout << "Received size: " << data.size() << std::endl;std::cout << "Received data: " << data << std::endl;std::cout << "Received data (hex): ";for (char c : data) {std::cout << std::hex << (int)(unsigned char)c << " ";}std::cout << std::dec << std::endl;});pipeListener.startListening();std::cout << "main" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(10));pipeListener.stopListening();std::this_thread::sleep_for(std::chrono::seconds(2));pipeListener.startListening();std::this_thread::sleep_for(std::chrono::seconds(10));pipeListener.stopListening();return 0;
}

http://www.lryc.cn/news/257869.html

相关文章:

  • 【Qt开发流程】之UI风格、预览及QPalette使用
  • 数组实现循环队列(增设队列大小size)
  • [BJDCTF2020]EzPHP 许多的特性
  • Ubuntu开机出现Welcome to emergency mode解决办法
  • Android 7.1 默认自拍镜像
  • 设计模式(二)-创建者模式(5)-建造者模式
  • 学习使用三个命令实现在腾讯云服务器TencentOS Server 3.1或者CentOS 8上安装ffmpeg
  • Java 22种设计模式详解
  • 代码随想录算法训练营第四十八天 _ 动态规划_198.打家劫舍、213.打家劫舍II、337.打家劫舍 III。
  • 记录一下快速上手Springboot登录注册项目
  • 【LVGL】STM32F429IGT6(在野火官网的LCD例程上)移植LVGL官方的例程(还没写完,有问题 排查中)
  • Vue学习笔记-Vue3中ref和reactive函数的使用
  • 大数据分析与应用实验任务十一
  • “78Win-Vận mệnh tốt”Trang web hỗ trợ kỹ thuật
  • React中使用react-json-view展示JSON数据
  • 一文简述“低代码开发平台”到底是什么?
  • HNU计算机体系结构-实验3:多cache一致性算法
  • Go语言学习路线规划
  • 微软NativeApi-NtQuerySystemInformation
  • 灵活与高效的结合,CodeMeter Cloud Lite轻云锁解决方案
  • Flink 系列文章汇总索引
  • 计算机网络——期末考试复习资料
  • 【数据结构】面试OJ题——链表
  • flask web开发学习之初识flask(三)
  • 【设计模式-3.1】结构型——外观模式
  • flutter学习-day2-认识flutter
  • 解决selenium使用.get()报错:unknown error: unsupported protocol
  • 关于加密解密,加签验签那些事
  • 容器重启后,Conda文件完整保存(虚拟环境、库包),如何重新安装conda并迁移之前的虚拟环境
  • gitee对接使用