当前位置: 首页 > news >正文

flink sql1.18.0连接SASL_PLAINTEXT认证的kafka3.3.1

阅读此文默认读者对docker、docker-compose有一定了解。

环境

docker-compose运行了一个jobmanager、一个taskmanager和一个sql-client。

如下:

version: "2.2"
services:jobmanager:image: flink:1.18.0-scala_2.12container_name: jobmanagerports:- "7081:8081"command: jobmanagervolumes:- ./jobmanager:/opt/flinkenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager:image: flink:1.18.0-scala_2.12container_name: taskmanager1depends_on:- jobmanagercommand: taskmanagervolumes:- ./taskmanager1:/opt/flinkscale: 1environment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 32sql-client:image: flink:1.18.0-scala_2.12container_name: sql-client-1command: bin/sql-client.shvolumes:- ./sql-client:/opt/flinkdepends_on:- jobmanagerenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagerrest.address: jobmanager

注意三个容器都映射了/opt/flink目录。需要先将/opt/flink目录拷贝到跟docker-compose.yml同一目录下,并分别重命名,如下图:

三个文件夹内容是一样的,只是名字不一样。

以上环境介绍完毕。

添加fllink-connector-kafka驱动

在maven官网下载相应jar包,分别放入上述三个文件夹的lib目录下。例如jobmanager:

启动sql-client,我用docker-compose启动的,因此命令为:docker-compose run sql-client.

创建kafka表

CREATE TABLE TestTable (`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',`partition` BIGINT METADATA VIRTUAL,`offset` BIGINT METADATA VIRTUAL,`presetBit` STRING,`imageTime` STRING,`imageName` STRING,`thumbnailWidth` BIGINT,`size` BIGINT,`thumbnailSize` BIGINT,`behavior` STRING,`imageUri` STRING,`presetId` STRING
) WITH ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='json','properties.security.protocol'='SASL_PLAINTEXT','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="your-username" password="your-pwd";'
);

说明:

①标识字段为kafka自带字段,topic中没有也会自带。

②标识的字段为topic中存储的字段,根据自己topic来。

③为自己的topic名称

④为kafka集群地址

⑤后面的username和password需要根据实际情况修改。

创建表以后执行select * from TestTable,可以看到类似下图的内容:

flink官网留下的坑

坑主要出现在最后一行。

flink官网是这样写的

首先指定的类不对,应该是org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule,如果按照官网写会报错:Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.plain.PlainLoginModule。

其次,username和password的双引号不需要写反斜杠,写反斜杠反而会报错。

http://www.lryc.cn/news/265553.html

相关文章:

  • pytorch张量的创建
  • Web自动化测试工具的优势分析
  • 黑豹程序员-读properties属性文件本地正常,打包jar后运行出错
  • PyQt6 QTimer计时器控件
  • Vue:defineAsyncComponent(异步组件)、component(动态组件)、keep-alive(缓存组件)
  • 14 款最佳文件恢复软件 [2024 年最佳精选工具]
  • Redis基础篇-004 Redis的Java客户端
  • 【数据结构和算法】---栈和队列的互相实现
  • 机场信息集成系统系列介绍(6):机场协同决策支持系统ACDM
  • GO设计模式——17、解释器模式(行为型)
  • 基于SSM的大学生兼职平台的设计与实现
  • Ignite内存配置
  • 前端基础vue路由懒加载
  • C++系列第九篇 数据类型下篇 - 复合类型(指针高级应用)
  • python三大开发框架django、 flask 和 fastapi 对比
  • html基础2
  • 基于博弈树的开源五子棋AI教程[5 启发式搜索]
  • JavaScript原型,原型链 ? 有什么特点?
  • Unity 问题 之 ScrollView ,LayoutGroup,ContentSizeFitter 一起使用时,动态变化时无法及时刷新更新适配界面的问题
  • linux 中 C++的环境搭建以及测试工具的简单介绍
  • 448. 找到所有数组中消失的数字
  • 为何在下雪天它“失宠”了,传统雪地靴居然不适合下雪穿
  • 第34节: Vue3 调用内联处理程序中的方法
  • JavaScript--明明白白Promise (Park One)
  • el-form与el-upload结合上传带附件的表单数据(后端篇)
  • postMessage——不同源的网页直接通过localStorage/sessionStorage/Cookies——技能提升
  • 上市公司-绿色投资者数据集(2000-2022)
  • 3 pandas之dataframe
  • vue-内网,离线使用百度地图(地图瓦片图下载静态资源展示定位)
  • OpenFeign 万字教程详解