当前位置: 首页 > news >正文

net8.0一键创建支持(Kafka)

Necore项目生成器 - 在线创建Necore模板项目 | 一键下载

 KafkaController.cs

using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using UnT.Template.Application.Responses;
using UnT.Template.Domain;namespace UnT.Template.Controllers
{[Route("api/kafkas")][ApiController]public class KafkaController : ControllerBase{private readonly IConfiguration _configuration;public KafkaController(IConfiguration configuration){_configuration = configuration;}[HttpPost("publish")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Insert(){try{var producerConfig = new ProducerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),ClientId = "UnT.Template",Acks = Acks.All, MessageSendMaxRetries = 3,RetryBackoffMs = 1000,LingerMs = 5 };// 创建生产者using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build()){var message = Newtonsoft.Json.JsonConvert.SerializeObject(new Pro_Product { Name = DateTime.Now.ToFileTime().ToString() });producer.Produce("unt_queue", new Message<Null, string> { Value = message },(deliveryReport) =>{if (deliveryReport.Error.Code != ErrorCode.NoError){Console.WriteLine($"消息发送失败: {deliveryReport.Error.Reason}");}else{Console.WriteLine($"消息发送到: {deliveryReport.TopicPartitionOffset}");}});producer.Flush(TimeSpan.FromSeconds(10));}return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}[HttpPost("consume")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Consume(){try{Task.Run(() =>{var consumerConfig = new ConsumerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),GroupId = "UnT.Template.Consumer.Group",EnableAutoCommit = false, AutoOffsetReset = AutoOffsetReset.Latest,EnablePartitionEof = true,StatisticsIntervalMs = 5000};using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build()){//订阅主题consumer.Subscribe("unt_queue");//取消令牌,用于优雅退出var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {e.Cancel = true;cts.Cancel();};try{while (true){try{//消费消息var cr = consumer.Consume(cts.Token);if (cr.IsPartitionEOF){Console.WriteLine($"分区 {cr.Partition} 已到达末尾,偏移量: {cr.Offset}");continue;}//检查空消息if (cr.Message == null){Console.WriteLine("收到空消息");continue;}//处理有效消息Console.WriteLine($"收到消息: {cr.Message.Value} [分区: {cr.Partition}, 偏移量: {cr.Offset}]");//手动提交偏移量(如果EnableAutoCommit=false)consumer.Commit(cr);}catch (ConsumeException e){Console.WriteLine($"消费错误: {e.Error.Reason}");}}}catch (OperationCanceledException){// 确保消费者正确关闭consumer.Close();}}});await Task.Delay(TimeSpan.FromSeconds(5));return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}}
}

http://www.lryc.cn/news/601587.html

相关文章:

  • 深度学习在自动驾驶车辆车道检测中的应用
  • 命令行和neovim的git操作软件-lazygit
  • GO语言 go get 下载 下来的包存放在哪里
  • MMAP 机制通俗易懂
  • 如何在 Ubuntu 24.04 或 22.04 中更改 SSH 端口
  • Qt C++动态库SDK在Visual Studio 2022使用(C++/C#版本)
  • 图像处理:第二篇 —— 选择镜头的基础知识及对图像处理的影响
  • sealos 方式安装k8s5节点集群
  • K8S 九 安全认证 TLS
  • 记录几个SystemVerilog的语法——时钟块和进程通信
  • 系统集成项目管理工程师【第九章 项目管理概论】 - 价值交付系统
  • C51:使用超声波测量距离
  • [10月考试] C
  • 零基础学习性能测试第五章:求最佳线程数
  • 抖音与B站爬虫实战,获取核心数据
  • Kotlin位运算
  • rust-模块树中引用项的路径
  • Python调用大模型api并部署到前端的主流技术栈以及具体框架对比
  • SecureCRT连接密钥交换失败
  • 问津集 #2:High Compression and Fast Search on Semi-Structured Logs
  • CPA全国青少年编程能力等级测评试卷及答案 Python编程(二级)
  • 第六章 JavaScript 互操(3)JS调用.NET
  • 攻击者可能会试图从bd.tao234窃取您的信息
  • 2024-2025华为ICT大赛中国区 实践赛网络赛道(高教组)全国总决赛 理论部分真题+解析
  • Sklearn 机器学习 数值指标 混淆矩阵confusion matrix
  • RS485转Profinet网关与JRT激光测距传感器在S7-1200 PLC系统中的技术解析与应用
  • 29.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--单体转微服务--用户配置服务
  • GitHub 趋势日报 (2025年07月25日)
  • 9.SpringBoot Web请求参数绑定方法
  • 设计模式(九)结构型:组合模式详解