当前位置: 首页 > news >正文

Win10安装kafka并用C#调用

 kafka安装

jdk、kafka版本如下,zookeeper使用kafka自带版本

安装包下载位置:https://download.csdn.net/download/henreash/90087368 (赚点csdn下载资源分)

安装jdk后,解压kafka压缩包,修改配置文件:

kafka_2.13-3.9.0\config\zookeeper.properties内修改:dataDir=D:/Kafka/zookeeper/data

kafka_2.13-3.9.0\config\server.properties内修改:log.dirs=D:/Kafka/kafka-logs

在目录内创建批处理文件,启动zookeeper和kafka:

kfk.cmd内容:call bin/windows/kafka-server-start.bat config/server.properties

zk.bat内容:call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

双击zk.bat、kfk.cmd启动zookeeper和kafka

kafka-manager安装

将kafka-manager2解压到d盘,注意目录结构不能太深,否则启动报错。

修改config\application.conf,修改节点kafka-manager.zkhosts指向zookeeper

 kafka-manager.zkhosts="192.168.0.109:2181"

创建start.bat批处理文件,内容:.\bin\kafka-manager.bat

双击启动kafka-manager。

打开浏览器,输入地址:http://localhost:9000/

点击Cluster菜单,创建一个默认Cluster Test001;

kafka环境配置完毕。

C#调用

创建C#8项目,在nuget中下载Confluent.Kafka(2.6.1),如下代码进行消息发布和订阅。

       private async void simpleButton2_Click(object sender, EventArgs e){var config = new ProducerConfig { BootstrapServers = "localhost:9092"};using (var producer = new ProducerBuilder<Null, string>(config).Build()){var dr = await producer.ProduceAsync("test-topic", new Message<Null, string> { Value = "hello,world" });Debug.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");}}private void simpleButton3_Click(object sender, EventArgs e){Task.Factory.StartNew(() => {var config = new ConsumerConfig{GroupId = "test-consumer-group",BootstrapServers = "localhost:9092",AutoOffsetReset = AutoOffsetReset.Earliest};var cts = new CancellationTokenSource();using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){consumer.Subscribe("test-topic");try{while (true){try{var cr = consumer.Consume(cts.Token);Debug.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");}catch (Exception ex){Console.WriteLine(ex);}}}catch (Exception ex2){Console.WriteLine(ex2);}}});}

执行后kafka-manager界面如下图:

http://www.lryc.cn/news/500330.html

相关文章:

  • 高级架构二 Git基础到高级
  • 深入解析二叉树算法
  • 如何解决maven项目使用Ctrl + /添加注释时的顶格问题
  • 总结的一些MySql面试题
  • 渤海证券基于互联网环境的漏洞主动防护方案探索与实践
  • 用Go语言重写Linux系统命令 -- nc简化版
  • 面试复盘 part 02·1202-1207 日
  • Linux评估网络性能
  • 实战ansible-playbook(四) -文件操作重定向/追加
  • 简单题:1.两数之和
  • RTCMultiConnection 跨域问题解决
  • 23种设计模式之解释器模式
  • Postgresql内核源码分析-表数据膨胀是怎么回事
  • github使用SSH进行克隆仓库
  • 【Linux系统】 Linux内核与UNIX设计哲学的结合
  • 以太网PHY_RGMII通信(基于RTL8211)--FPGA学习笔记22
  • PowerShell 脚本实战:解决 GitLab 仓库文件批量重命名难题
  • 数据分析及应用:滴滴出行打车日志数据分析
  • Odoo :一款免费且开源的食品生鲜领域ERP管理系统
  • 请求路径中缺少必需的路径变量[xxxId]
  • 【在Linux世界中追寻伟大的One Piece】HTTP cookie
  • COLA学习之DDD各种术语分析(一)
  • Pygments:高效的语法高亮工具
  • 算法-字符串-43.字符串相乘
  • linux的vdagent框架设计
  • CV工程师专用键盘开源项目硬件分析
  • qtcanpool 知 08:Docking
  • Milvus向量数据库01-基础概念
  • mysql备份数据库
  • NLP与LLM的工程化实践与学习思考 - 写在开头