当前位置: 首页 > news >正文

【flink】之如何消费kafka数据并读写入redis?

背景: 

最近公司出现做了一个新需求,需求内容是加工一个营销时机,但是加工营销时机的同时需要把数据内容里的一个idmapping存入redis用于后续的读写。

准备: 

    <!-- 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency></dependencies>

代码:

package com.iterge.flink;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;/*** Hello world!**/
@Slf4j
public class FlinkDemo {//创建连接池static final JedisPool pool = new JedisPool("127.0.0.0",8423);//创建redis客户端static final Jedis jedis = pool.getResource();public static void main( String[] args ) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//DataStreamSource<String> stringDataStreamSource = env.fromData(Arrays.asList("1", "2", "3"));KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");stringDataStreamSource.map(new RichMapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {//读redisSystem.out.println("test="+jedis.get("test"));//写redisjedis.setex("test",60,s);return s;}@Overridepublic void close() throws Exception {super.close();jedis.close();}});stringDataStreamSource.print();env.execute("test");}
}

http://www.lryc.cn/news/447884.html

相关文章:

  • 搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(二)-索引
  • 离散化算法
  • 基于ollama的本地RAG实践
  • 安卓开发板_MTK开发板_联发科开发评估套件Demo板接口介绍
  • 代码随想录冲冲冲 Day58 图论Part9
  • UnityHub下载任意版本的Unity包
  • 网站服务器怎么计算同时在线人数?
  • [spring]MyBatis介绍 及 用MyBatis注解操作简单数据库
  • Ks渲染做汽车动画吗?汽车本地渲染与云渲染成本分析
  • AI智能时代:哪款编程工具让你的工作效率翻倍?
  • 这五本大模型书籍,让你从大模型零基础到精通,非常详细收藏我这一篇就够了
  • 面试经典150题 堆
  • day-62 每种字符至少取 K 个
  • 免费好用!AI声音克隆神器,超级简单,10秒就能克隆任何声音!(附保姆级教程)
  • LeetCode146 LRU缓存
  • 【Java】包装类【主线学习笔记】
  • 华为HarmonyOS地图服务 11 - 如何在地图上增加点注释?
  • uniapp js怎么根据map需要显示的点位,计算自适应的缩放scale
  • Mysql 架构
  • C语言 | Leetcode C语言题解之第429题N叉树的层序遍历
  • Python中列表常用方法
  • 『功能项目』下载Mongodb【81】
  • 图像特征提取-SIFT
  • ElasticSearch分页查询性能及封装实现
  • Python精选200Tips:176-180
  • 【Kotlin 集合概述】可变参数vararg、中缀函数infix以及解构声明(二十)
  • unity安装报错问题记录
  • 秋招|面试|群面|求职
  • 【Kubernetes】日志平台EFK+Logstash+Kafka【理论】
  • 基于SpringBoot+Vue+MySQL的教学资料管理系统