当前位置: 首页 > news >正文

kafka+ubuntu20.04+docker配置

记录一次配置过程

安装docker

参加下面链接的第一部分

Ubuntu20.04使用docker安装kafka服务-CSDN博客

安装zookeeper

docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

安装kafka服务

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=[你的IP地址]:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[你的IP地址]:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

改成我自己的本机的就是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.147.131:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.147.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

进入docker

执行docker ps -a命令,查看结果如下:

 注意上述结果显示kafka和zookeeper都是已关闭状态,需要开启,使用下面的命令:docker start 容器id,即

docker start a01c
docker start c0c7

注意要先启动父容器(或者说是依赖项zookeeper),再启动子容器kafka,容器id输入前几位有区分即可,不必全部输入,也不必补全(tab不会给你补全)

执行docker ps -a命令,再次查看结果如下:

ok这就启动成功了

然后执行命令进入docker

docker exec -it [container-id] /bin/sh    #container-id就是上图中c0c7这一串数字,执行命令时只需要输入前几个数字/字母即可
cd /opt/kafka/bin    #跳转到kafka目录下的bin目录,这下面有一些可执行脚本

换成我的本机就是执行下面的命令:

docker exec -it c0c7 /bin/sh    #container-id就是上图中41b8e4db352f这一串数字,执行命令时只需要输入前几个数字/字母即可
cd /opt/kafka/bin    #跳转到kafka目录下的bin目录,这下面有一些可执行脚本

得到下面的结果:

生产者消费者模式初体验

启动生产者

注意下面的ip:192.168.147.131要和你本机ip一致,使用ifconfig命令查看

./kafka-console-producer.sh --broker-list 192.168.147.131:9092 --topic wxj

打开另一个终端,启动消费者

 注意ip要一致,topic要一致

docker exec -it c0c7 /bin/sh
cd /opt/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.147.131:9092 --topic wxj

运行效果如下:

其他一点别的,就是常识啦,比如如何退出生产者消费者的环境,ctrl+c,ctrl+d都试试,如何退出容器,可以试试输入exit命令

ok 完事了,还是去探索腾讯云的kafka服务吧

常用命令行命令

首先进入kafka容器的环境中

docker exec -it c0c7 /bin/sh
cd /opt/kafka/bin

kafka-topics.sh

对主题topic进行增删改查的工具

常用选项如下:

--bootstrap-server:kafka服务器ip:port,需要查询指定的topic或者topic列表,必须指定这个量
--create:创建主题
--delete:删除主题
--describe:描述主题
--list:查看主题列表
--alter:修改主题的 partitions等
--topic <String: topic>:主题名
--topic-id <String: topic-id>:主题id
--partitions <Integer: # of partitions>:主题的partition


新增

命令

kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic wxj2

效果

查看topic列表

命令

kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list

效果

查看topic详情(描述)

命令

kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic wxj2

效果

修改

命令
以修改主题partiion数量为例

kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic wxj2 --partitions 3

再次查看topic的描述发现分区完成 

删除

命令

kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic wxj2

kafka-console-producer.sh

标准输入读数据,发送到Kafka的工具

常用选项如下:

--bootstrap-server:kafka服务器ip:port,必须的
--topic <String: topic> :Kafka主题,必须的
--sync:同步发送
--compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.
 

使用示例

kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

kafka-console-consumer.sh

常用选项如下:

--bootstrap-server:kafka服务器ip:port,必须的
--topic <String: topic> :Kafka主题,必须的
--group <String: consumer group id>:消费者组id
--key-deserializer <String: deserializer for key>:key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
--value-deserializer <String: deserializer for values>:value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
--offset <String: consume offset>:消费的offset
--partition <Integer: partition>:消费的分区
 

使用示例

kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo
kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2

上述命令的使用方法是进入到了docker容器kafka中了,要是不想进入环境直接运行命令的示例是

docker exec -it c0c7 /opt/kafka/bin/kafka-topics.sh  --bootstrap-server=127.0.0.1:9092 --create --topic demo

效果:

http://www.lryc.cn/news/230576.html

相关文章:

  • 遍历一个对象,并得出所对应的值
  • WGCLOUD的特点整理
  • 新版软考高项试题分析精选(三)
  • 从申请服务器到Docker部署Java项目至最后运行完结
  • 解决 requests.post 数据字段编码问题的方法
  • 安全运维:cmd命令大全(108个)
  • 构建Docker基础镜像(ubuntu20.04+python3.9.10+pytorch-gpu-cuda11.8)
  • Flowable自定义Id生成器
  • 怎样正确选择等保测评机构开展等保测评工作?
  • 【论文阅读笔记】Detecting AI Trojans Using Meta Neural Analysis
  • 【PyTorch教程】如何使用PyTorch分布式并行模块DistributedDataParallel(DDP)进行多卡训练
  • Istio学习笔记-体验istio
  • fastjson 系列漏洞
  • odoo前端js对象的扩展方法
  • 力扣双周赛 -- 117(容斥原理专场)
  • 基于Rabbitmq和Redis的延迟消息实现
  • Masked Relation Learning for DeepFake Detection
  • R语言爬虫程序自动爬取图片并下载
  • 2023年10月国产数据库大事记-墨天轮
  • Linux内核分析(十四)--内存管理之malloc、free 实现原理
  • Hive函数
  • 教资笔记(目录)
  • np.repeat()的注意事项
  • 239. 滑动窗口最大值
  • c++ barrier 使用详解
  • c# 接口
  • 1、NPC 三电平SVPWM simulink仿真
  • JAVA对象列表强转失败,更好的方法
  • 2023最新版本 从零基础入门C++与QT(学习笔记) -5- 动态内存分配(new)
  • asp.net校园招聘管理系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio