当前位置: 首页 > news >正文

pyflink过滤kafka数据

from pyflink.table import (TableEnvironment, EnvironmentSettings)# 输入、输出、过滤条件
columns_in = [
...
]columns_out = [
...
]
filter_condition = "name = '蒋介石' and sex = '男'"# 创建执行环境t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///work/flink-sql-connector-kafka-3.2.0-1.19.jar")source_topic = "foo"
sink_topic = "baa"
kafka_servers = "kafka:9092"
kafka_consumer_group_id = "flink consumer"columnstr = ','.join([f"`{col}` VARCHAR"  for col in columns_in])
source_ddl = f"""
CREATE TABLE kafka_source({columnstr}) WITH ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""columnstr2 = ','.join([f"`{col}` VARCHAR"  for col in columns_out])
sink_ddl = f"""
CREATE TABLE kafka_sink ({columnstr2}) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','format' = 'json')
"""
# 过滤字段
filtersql = f"""
insert into kafka_sink
select {
','.join([f"`{col}`"  for col in columns_out])
}
from kafka_source
where {filter_condition}
"""
t_env.execute_sql(filtersql)
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
http://www.lryc.cn/news/460228.html

相关文章:

  • Webpack 完整指南
  • 如何在 Ubuntu20.04 安装FTP Server vsftpd
  • 基于FPGA的DDS信号发生器(图文并茂+深度原理解析)
  • QT:绘制事件和定时器
  • 【算法——递归回溯】
  • 手机在网状态接口的使用和注意事项
  • WebGl 使用uniform变量动态修改点的颜色
  • Leetcode 划分字母区间
  • 可编辑div遇到的那些事
  • 什麼是高速HTTP代理?
  • 三子棋(C 语言)
  • HWS赛题 入门 MIPS Pwn-Mplogin(MIPS_shellcode)
  • 纯血鸿蒙启动公测,爱加密鸿蒙加固平台发布,助力鸿蒙应用安全运营!
  • MySQL中 truncate、drop和delete的区别
  • 什么开放式耳机值得买?开放式耳机推荐排行榜!
  • Apache Doris的分区与分桶详解
  • docker详解介绍+基础操作 (二)info详解
  • C0023.在Clion中创建控件,对控件进行提升为自定义控件的步骤
  • 探索 C# 常用第三方库与框架
  • NodeJS GRPC简单的例子
  • 无人机之三维航迹规划篇
  • 风格迁移-StyTr 2 : Image Style Transfer with Transformers
  • 上百种【基于YOLOv8/v10/v11的目标检测系统】目录(python+pyside6界面+系统源码+可训练的数据集+也完成的训练模型)
  • 记录搜罗到的Matlab 对散点进行椭圆拟合
  • 分享我最近使用《柬埔寨语翻译通》App的体验,不会说高棉语也能去柬埔寨旅游,畅通无阻!
  • 文本语义检索系统的搭建过程,涵盖了召回、排序以及Milvus召回系统、短视频推荐等相关内容
  • redis在项目中运用(基础)
  • libaom 源码分析系列:svc_encoder_rtc.cc 文件
  • MySQL备份和还原,用mysqldump、mysql和source命令来完成
  • MySQL Server、HeidiSQL(MySQL 数据库工具)