当前位置: 首页 > news >正文

五分钟,Docker安装flink,并使用flinksql消费kafka数据

1、拉取flink镜像,创建网络

docker pull flink
docker network create flink-network

2、创建 jobmanager

# 创建 JobManager docker run \-itd \--name=jobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest jobmanager 

3、创建 taskmanager

# 创建 TaskManager docker run \-itd \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest taskmanager 

4、访问 http://localhost:8081/

在这里插入图片描述

4.1 修改Task Slots

默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/confflink-conf.yaml文件:
在这里插入图片描述
在这里插入图片描述
修改taskmanager.numberOfTaskSlots:即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

在这里插入图片描述

5、通过flinksql消费Kafka

确保有一个可用的kafka,如果没有,可以五分钟内,Docker搭建一个
Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka

5.1 导入flink-sql-connector-kafka jar包

顾名思义,用于连接flinksql和kafka。
进入flink

docker exec -it jobmanager /bin/bash

进入 flink的bin目录

cd /opt/flink/bin

查看flink版本:

flink --version

可以看出,我的版本是1.18.0
在这里插入图片描述
根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:
在这里插入图片描述
点进去进行下载:
在这里插入图片描述

将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:
在这里插入图片描述
可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可

5.2 flinksql消费kafka

进入jobmanager中,执行

cd /opt/flink/bin
sql-client.sh

在这里插入图片描述
在这里插入图片描述
Flink SQL执行以下语句:

CREATE TABLE KafkaTable (`count_num` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'kafka_demo','properties.bootstrap.servers' = '192.168.10.15:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);show tables;
select * from KafkaTable;

可以看到Flink在消费kafka数据,如下图:
在这里插入图片描述

http://www.lryc.cn/news/247941.html

相关文章:

  • 【小聆送书第一期】让架构师的成神之路温暖你这个不景气的冬天
  • 网页爬虫反扒措施有哪些?
  • C#实现批量生成二维码
  • 3种在ArcGIS Pro中制作山体阴影的方法
  • 【ChatGLM2-6B】Docker下部署及微调
  • 输入两个整数,输出它们的乘积。 ← Python 及 C++ 代码比较
  • C语言——从键盘输人一个表示年份的整数,判断该年份是否为闰年,并显示判断结果。
  • 出于隐私和安全的考虑,有时需要从谷歌删除你的个人数据,有两种方法
  • 【同一局域网下】两台电脑之间互ping
  • 【精选】Ajax技术知识点合集
  • 智能优化算法应用:基于水循环算法无线传感器网络(WSN)覆盖优化 - 附代码
  • java-netty知识点笔记和注意事项
  • 英伟达不同系列GPU介绍
  • C语言——I /深入理解指针(二)
  • MySQL使用函数和存储过程实现:向数据表快速插入大量测试数据
  • 力扣labuladong——一刷day59
  • 接口性能测试 —— Jmeter并发与持续性压测
  • redis报错3
  • Proteus的网络标号与总线
  • 4、stable diffusion
  • LeetCode51. N-Queens
  • 前端vue3——html2canvas给网站截图生成宣传海报
  • C语言实现串的部分算法
  • UE5、CesiumForUnreal实现加载GeoJson绘制多面(MultiPolygon)功能(支持点选高亮)
  • pandas教程:USDA Food Database USDA食品数据库
  • 0基础学习VR全景平台篇第122篇:VR视频剪辑和输出 - PR软件教程
  • ucharts中,当数据为0时,不显示
  • React函数组件渲染两次
  • 人工智能 - 图像分类:发展历史、技术全解与实战
  • go标准库