当前位置: 首页 > news >正文

tokio tcp通信

引入crate

tokio = { version = "1.35.1", features = ["full"] }

服务端

use std::time::Duration;
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt},net::{tcp::{OwnedReadHalf, OwnedWriteHalf},TcpListener, TcpStream,},sync::mpsc,
};#[tokio::main]
async fn main() {println!("Begin Start Server...");let server = TcpListener::bind("127.0.0.1:10888").await.unwrap();while let Ok((client_stream, client_addr)) = server.accept().await{println!("accept client: {}", client_addr);tokio::spawn(async move{process_client(client_stream).await;});}
}async fn process_client(client_stream: TcpStream){let (client_reader, client_writer) = client_stream.into_split();let (msg_tx, msg_rx) = mpsc::channel::<String>(100);let mut read_task = tokio::spawn(async move {read_from_client(client_reader, msg_tx).await;});let mut write_task = tokio::spawn(async move{write_to_client(client_writer, msg_rx).await;});if tokio::try_join!(&mut read_task, &mut write_task).is_err() {read_task.abort();write_task.abort();};
}async fn read_from_client(reader: OwnedReadHalf, mst_tx: mpsc::Sender<String>){let mut buf_reader = tokio::io::BufReader::new(reader);let mut buf = String::new();loop{match buf_reader.read_line(&mut buf).await{Err(_e) =>{eprintln!("read from client error");break;}Ok(0) =>{println!("client closed");break;}Ok(n) => {buf.pop(); //去除末尾的回车符let mut content = buf.drain(..).as_str().to_string();println!("read {} bytes from client. content: {}", n, content);tokio::time::sleep(Duration::from_secs(1)).await;content.push('\n');if mst_tx.send(content).await.is_err(){eprintln!("receiver closed");break;}}}}
}async fn write_to_client(writer: OwnedWriteHalf, mut msg_rx: mpsc::Receiver<String>){let mut buf_writer = tokio::io::BufWriter::new(writer);while let Some(mut str) = msg_rx.recv().await{//str.push('\n');if let Err(e) = buf_writer.write_all(str.as_bytes()).await {eprintln!("write to client failed: {}", e);break;}buf_writer.flush().await;print!("write to client: {}", str);}
}

客户端

use std::sync;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{Local, NaiveDateTime};use tokio::{io::{Interest, AsyncBufReadExt, AsyncWriteExt},net::{tcp::{OwnedReadHalf, OwnedWriteHalf},TcpStream,},sync::mpsc,
};#[tokio::main]
async fn main() {let stream = TcpStream::connect("127.0.0.1:10888").await.unwrap();let (reader, writer) = stream.into_split();let mut buf = String::new();//[0u8, 12];let mut buf_reader = tokio::io::BufReader::new(reader);let mut buf_writer = tokio::io::BufWriter::new(writer);loop{let now=Local::now();let formatted=now.format("%Y-%m-%d %H:%M:%S");let content = format!("hello world {}\n", formatted);//buf_writer.write_all(b"hello world\n").await;buf_writer.write_all(content.as_bytes()).await;buf_writer.flush().await;println!("send:{}", content);match buf_reader.read_line(&mut buf).await {Err(_e) =>{eprintln!("read from server error");break;}Ok(n) =>{buf.pop();let content = buf.as_str().to_string();println!("received: {}  {}", n, content);}};}}

客户端、服务端都使用TcpStream的into_split方法获取网络通信读和写实例,进而获取buffer读写对象,通过channel实现线程执行同步。子线程使用tokio::spawn函数启动。

http://www.lryc.cn/news/300756.html

相关文章:

  • LCR 122. 路径加密【简单】
  • SpringUtils 工具类,方便在非spring管理环境中获取bean
  • JavaWeb之请求
  • VsCode中常用的正则表达式操作
  • ubuntu22.04@laptop OpenCV Get Started: 007_color_spaces
  • mysql 查询性能优化关键点总结
  • React - 分页插件默认是英文怎么办
  • 揭开Markdown的秘籍:引用|代码块|超链接
  • 【C语言】Debian安装并编译内核源码
  • 使用 C++23 从零实现 RISC-V 模拟器(6):权限支持
  • 针对某终端安全自检钓鱼工具的分析
  • XSS数据接收平台
  • MySQL 基础知识(六)之数据查询(一)
  • C#使用哈希表对XML文件进行查询
  • vscode写MATLAB配置
  • 第13章 网络 Page734 “I/O对象”的链式传递 单独的火箭发射函数,没有用对的智能指针
  • Git 存储大文件
  • 使用 Mermaid 创建流程图,序列图,甘特图
  • 政安晨:在Jupyter中【示例演绎】Matplotlib的官方指南(二){Image tutorial}·{Python语言}
  • gem5学习(20):替换策略——Replacement Policies
  • 嵌入式Qt Qt中的字符串类
  • 函数高级(C++)
  • jmeter-10调试取样器
  • C#,二进制数的按位旋转(Bits Rotate)算法与源代码
  • 解决ubuntu登录密码问题
  • Ubuntu忘记登录密码重置步骤
  • MySQL数据库基础(五):SQL语言讲解
  • python-使用ffmpeg批量修改文件的后缀名
  • 关于jupyter的一些小笔记
  • macOS 安装 conda