当前位置: 首页 > news >正文

【flink】之如何消费kafka数据?

为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。

 1.环境准备

确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181

2.Maven项目设置

创建一个新的Maven项目,并在pom.xml中添加以下依赖:

<dependencies>  <!-- Flink dependencies -->  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java_2.12</artifactId>  <version>1.13.2</version>  </dependency>  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.12</artifactId>  <version>1.13.2</version>  </dependency>  <!-- Kafka client dependency -->  <dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.8.0</version>  </dependency>  <!-- Logging -->  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId>  <version>1.7.30</version>  </dependency>  
</dependencies>

注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。

3.编写Flink Kafka Consumer代码

import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;  import java.util.Properties;  public class FlinkKafkaConsumerDemo {  public static void main(String[] args) throws Exception {  // 设置执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // Kafka消费者属性  Properties props = new Properties();  props.put("bootstrap.servers", "localhost:9092");  props.put("group.id", "test-group");  props.put("enable.auto.commit", "true");  props.put("auto.commit.interval.ms", "1000");  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 创建Kafka消费者  FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(  "input-topic", // Kafka topic  new SimpleStringSchema(), // 反序列化器  props);  // 添加数据源  DataStream<String> stream = env.addSource(myConsumer);  // 数据处理  stream.map(new MapFunction<String, String>() {  @Override  public String map(String value) throws Exception {  return "Received: " + value;  }  }).print();  // 执行流程序  env.execute("Flink Kafka Consumer Example");  }  // 简单的字符串反序列化器  public static final class SimpleStringSchema implements DeserializationSchema<String> {  @Override  public String deserialize(byte[] message) throws IOException {  return new String(message, "UTF-8");  }  @Override  public boolean isEndOfStream(String nextElement) {  return false;  }  @Override  public TypeInformation<String> getProducedType() {  return BasicTypeInfo.STRING_TYPE_INFO;  }  }  
}

4.执行程序

  1. 确保Kafka正在运行,并且有一个名为input-topic的topic(如果没有,你需要先创建它)。
  2. 编译并运行你的Maven项目
http://www.lryc.cn/news/407654.html

相关文章:

  • 科研绘图系列:R语言山脊图(Ridgeline Chart)
  • Boost搜索引擎:如何建立 用户搜索内容 与 网页文件内容 之间的关系
  • 【QT】QT 窗口(菜单栏、工具栏、状态栏、浮动窗口、对话框)
  • Golang | Leetcode Golang题解之第283题移动零
  • ubuntu22.04 安装 NVIDIA 驱动以及CUDA
  • 数据结构·AVL树
  • 记一次Mycat分库分表实践
  • 数据分析:微生物数据的荟萃分析框架
  • Django—admin后台管理
  • 数字图像处理中的常用特殊矩阵及MATLAB应用
  • vue侦听器(Watch)精彩案例剖析一
  • HTTP 协议浅析
  • VsCode | 让空文件夹始终展开不折叠
  • Centos7_Minimal安装Cannot find a valid baseurl for repo: base/7/x86_6
  • Spark_Oracle_II_Spark高效处理Oracle时间数据:通过JDBC桥接大数据与数据库的分析之旅
  • 力扣 459重复的子字符串
  • MyBatis XML配置文件
  • 读写RDS或RData等不同格式的文件,包括CSV和TXT、Excel的常见文件格式,和SPSS、SAS、Stata、Minitab等统计软件的数据文件
  • Android 支持的媒体格式,(二)视频支持格式
  • 密码学原理精解【8】
  • 2024年钉钉杯大数据竞赛A题超详细解题思路+python代码手把手保姆级运行讲解视频+问题一代码分享
  • unity2D游戏开发01项目搭建
  • 删除的视频怎样才能恢复?详尽指南
  • LeetCode160 相交链表
  • 高性能响应式UI部件DevExtreme v24.1.4全新发布
  • Python实现Java mybatis-plus 产生的SQL自动化测试SQL速度和判断SQL是否走索引
  • UDP的报文结构及其注意事项
  • MySQL深度分页问题深度解析与解决方案
  • C#类型基础Part1-值类型与引用类型
  • 被上市公司预判的EPS增速分析