当前位置: 首页 > news >正文

PiflowX组件-ReadFromKafka

ReadFromKafka组件

组件说明

从kafka中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic-1
topic_patternTOPIC_PATTERN“”匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。topic1_*
startup_modeSTARTUP_MODE“”Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”)Kafka consumer 的启动模式。earliest-offset
schemaSCHEMA“”Kafka消息的schema信息。id:int,name:string,age:int
formatFORMAT“”Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”)用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。json
groupGROUP“”Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。group_1
propertiesPROPERTIES“”Kafka source连接器其他配置

ReadFromKafka示例配置

{"flow": {"name": "DataGenTest","uuid": "1234","stops": [{"uuid": "0000","name": "DataGen1","bundle": "cn.piflow.bundle.flink.common.DataGen","properties": {"schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]","count": "100","ratio": "5"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","schema": "","format": "json","properties": "{}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "test","group": "test","startup_mode": "earliest-offset","schema": "id:int,name:string,age:int","format": "json","properties": "{}"}},{"uuid": "3333","name": "ShowData1","bundle": "cn.piflow.bundle.flink.common.ShowData","properties": {"showNumber": "5000"}}],"paths": [{"from": "DataGen1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "ShowData1"}]}
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[{       "filedName": "id","filedType": "INT","kind": "sequence","start": 1,"end": 10000},{       "filedName": "name","filedType": "STRING","kind": "random","length": 15},{       "filedName": "age","filedType": "INT","kind": "random","max": 100,"min": 1} 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

http://www.lryc.cn/news/269629.html

相关文章:

  • Ubuntu 安装MySQL以及基本使用
  • 基于Freeswitch实现的Volte网视频通知应用
  • 怎么实现Servlet的自动加载
  • 15. Mysql 变量的使用
  • 为什么ChatGPT采用SSE协议而不是Websocket?
  • Elasticsearch:使用 ELSER v2 文本扩展进行语义搜索
  • Matlab:BP神经网络算法,二叉决策树
  • Python实现员工管理系统(Django页面版 ) 七
  • 听GPT 讲Rust源代码--src/tools(34)
  • k8s的陈述式资源管理(命令行操作)
  • uniapp uview裁剪组件源码修改(u-avatar-cropper),裁出可自定义固定大小图片
  • 【机器学习前置知识】Beta分布
  • Notepad++批量更改文件编码格式及文档格式
  • Linux驱动开发学习笔记6《蜂鸣器实验》
  • 鸿蒙(HarmonyOS 3.1) DevEco Studio 3.1开发环境汉化
  • 毫米波雷达:从 3D 走向 4D
  • CENTOS docker拉取私服镜像
  • 【前端面经】即时设计
  • 前端三件套html/css/js的基本认识以及示例程序
  • 云计算:OpenStack 配置云主机实例的存储挂载并实现外网互通
  • python/selenium/jenkins整合
  • 华为路由器ACL操作SSH接口
  • Flutter 三点三:Dart Stream
  • centos 防火墙 设置 LTS
  • SAP缓存 表缓存( Table Buffering)
  • Mybatis插件入门
  • DOA估计算法——迭代自适应算法(IAA)
  • Python If语句以及代码块的基本介绍
  • [嵌入式专栏](FOC - SVPWM扇区计算Part1)
  • 亚马逊美国站ASTM F2613儿童折叠椅和凳子强制性安全标准