当前位置: 首页 > news >正文

.netcore grpc双向流方法详解

一、双向流处理概述

  1. 简单来讲客户端可以向服务端发送消息流,服务端也可以向客户端传输响应流,即客户端和服务端可以互相通讯
  2. 客户端无需发送消息即可开始双向流式处理调用 。 客户端可选择使用 RequestStream.WriteAsync 发送消息。 使用 ResponseStream.MoveNext() 或 ResponseStream.ReadAllAsync() 可访问从服务流式处理的消息。ResponseStream 没有更多消息时,双向流式处理调用完成。

二、案例简介

  1. 客户端发送请求流通过equestStream.WriteAsync传入到服务端
  2. 服务端响应到客户端的流通过ResponseStream.WriteAsync写入到客户端
  3. 服务端使用System.Threading.Channels保证线程安全交互

三、服务端配置(注意:grpc相关配置参考我之前的文章)

  1. 配置.proto文件
// 1.提供公共的实体proto文件
// 2.服务引用对应的proto文件
// 3.定义三个客户流方法//定义messages.proto文件令需要注意项目文件中的特性GrpcServices=None;syntax = "proto3";option csharp_namespace = "GrpcProject";package grpc.serviceing;// 消息推送/接收实体
message ExampleMessage
{string msg = 1;
}// 双向流文件twowaystream.protosyntax = "proto3";import "Protos/messages.proto";option csharp_namespace = "GrpcProject";package grpc.serviceing;service BothWaysRpc{// 双向流rpc StreamingBothWays(stream ExampleMessage) returns (stream ExampleMessage);
}
  1. 1 服务接口实现
    /// <summary>/// 双向流服务/// </summary>public class BothWaysService : BothWaysRpc.BothWaysRpcBase{/// <summary>/// 自动重置事件/// </summary>private readonly ManualResetEventSlim _event;public BothWaysService(){_event = new ManualResetEventSlim(false);}public override async Task StreamingBothWays(IAsyncStreamReader<ExampleMessage> requestStream,IServerStreamWriter<ExampleMessage> responseStream,ServerCallContext context){// 创建线程安全的有限容量通道var channel = Channel.CreateBounded<ExampleMessage>(new BoundedChannelOptions(capacity: 5));var task = Task.Run(async () =>{await foreach (var message in requestStream.ReadAllAsync()){// 读取消息 写入通道if (!string.IsNullOrWhiteSpace(message.Msg)){await Console.Out.WriteLineAsync($"记录客户端传入消息:{message.Msg}");// todo 消息处理await channel.Writer.WriteAsync(message, context.CancellationToken);}}}, context.CancellationToken);await foreach (var message in channel.Reader.ReadAllAsync()){// 打印通道接收的消息await Console.Out.WriteLineAsync($"通道传入消息:{message.Msg}");// 写入响应流ExampleMessage exampleMessage = new ExampleMessage() { Msg = $"我已经接收到消息:{message.Msg}" };await responseStream.WriteAsync(exampleMessage);if (message.Msg.ToLower() == "exit"){break;}}// 完结写入通道channel.Writer.Complete();await task;}}
  1. 2 Program注入
    public class Program{public static void Main(string[] args){var builder = WebApplication.CreateBuilder(args);builder.Services.AddGrpc();var app = builder.Build();// 一元方法//app.MapGrpcService<DollarService>();// 客户端流//app.MapGrpcService<ClientStreamService>();// 服务端流//app.MapGrpcService<ServerStreamService>();// 双向流app.MapGrpcService<BothWaysService>();app.Run();}}

四、客户端配置

  1. 引用proto文件,配置为客户端类型
  2. 根据编译生成的函数进行传参调用
  3. 创建WPF测试客户端

button按钮触发grpc

 

    /// <summary>/// BothWaysClient.xaml 的交互逻辑/// </summary>public partial class BothWaysClient : Window{public BothWaysClient(){InitializeComponent();}private async void Excute_Click(object sender, RoutedEventArgs e){Action<string> action = str => { txtValue.Text += $"{str}\r\n"; };await WpfClient.Show(action);txtValue.Text += "\r\n\r\n";}}

grpc客户端接口调用

        /// <summary>/// 双向流/// </summary>/// <param name="action"></param>/// <returns></returns>public static async Task Show(Action<string> action){var messages = new List<string>(){"test","one","two","three","false","four","Oooo","dddd","vvvfff","exit"};Random rnd = new Random(20);var channel = GrpcChannel.ForAddress("https://localhost:7188");var client = new GrpcProject.BothWaysRpc.BothWaysRpcClient(channel);var bothWays = client.StreamingBothWays();var requestTask = Task.Run(async () =>{while (true){var index = rnd.Next(messages.Count);var msg = messages[index];await bothWays.RequestStream.WriteAsync(new ExampleMessage { Msg = msg });if (msg == "exit"){break;}}});await foreach (var item in bothWays.ResponseStream.ReadAllAsync()){action(item.Msg);if (item.Msg == "我已经接收到消息:exit"){break;}}await requestTask;}

五、执行结果

服务端:

 客户端:

 六、源码地址

链接:https://pan.baidu.com/s/1uCirfbexPJ7C-AujBVtkCQ 
提取码:sd4y

七、后续进阶简介

  1. 接下来会讲解客户端工厂,优化客户端请求地址使用依赖注入提取各个服务
  2. proto文件各个字段详细介绍
  3. token认证
  4. 截止时间(中止请求)和请求取消
  5. AOP切面策略
  6. 重试策略(policy)
  7. 负载均衡策略(grpc本身提供的策略及nginx代理)
  8. 日志记录
  9. 健康检查
  10. 后续有更多特色功能会持续补充
http://www.lryc.cn/news/123827.html

相关文章:

  • 【Servlet】(Servlet API HttpServlet 处理请求 HttpServletRequest 打印请求信息 前端给后端传参)
  • java中右移>>和无符号右移>>>的区别
  • 牛客周赛 Round 7
  • R语言生存分析(机器学习)(1)——GBM(梯度提升机)
  • k8s和docker简单介绍
  • Lua学习记录
  • 三分钟完美解决你的C盘内存过大爆红
  • C++ - equal(比较两个vector元素)
  • 多线程:线程池
  • 9.3.2.2网络原理(传输层TCP)
  • ssm+mybatis无法给带有下划线属性赋值问题
  • 学习笔记-JVM监控平台搭建
  • 使用css实现时间线布局(TimeLine)
  • 深入浅出 栈和队列(附加循环队列、双端队列)
  • 前端基础(二)
  • ORB-SLAM2学习笔记7之System主类和多线程
  • gin的占位符:和通配符*
  • 【量化课程】08_2.深度学习量化策略基础实战
  • 12-数据结构-数组、矩阵、广义表
  • Idea 反编译jar包
  • 【Git】安装以及基本操作
  • Spring创建Bean的过程(2)
  • Linux 终端操作命令(2)内部命令
  • 【Git】大大大问题之syntax error near unexpected token `(‘ 的错误解决办法
  • Flink源码之TaskManager启动流程
  • 加入微软MCPP有什么优势?
  • leetcode做题笔记78子集
  • Skywalking-9.6.0系列之本地源码编译并启动
  • proteus结合keil-arm编译器构建STM32单片机项目进行仿真
  • 第五十三天