当前位置: 首页 > news >正文

大数据-玩转数据-Flink RedisSink

一、添加Redis Connector依赖

具体版本根据实际情况确定

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version>
</dependency>

二、启动redis

参见大数据-玩转数据-Redis 安装与使用

三、编写代码

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class SinkRedis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer key) throws Exception {return key.intValue();}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop100").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).setDatabase(0).setPassword("redis").build();keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(Integer integer) {return integer.toString();}@Overridepublic String getValueFromData(Integer integer) {return integer.toString();}}));env.execute();}
}

可以根据要写入的redis的不同数据类型进行调整

四、查询结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

http://www.lryc.cn/news/122198.html

相关文章:

  • c++病毒/恶搞代码大全( 上 )
  • 【数学建模】清风数模更新5 灰色关联分析
  • Windows下运行Tomcat服务时报GC Overhead Limit Exceeded
  • OpenCV实例(八)车牌字符识别技术(一)模式识别
  • OPENCV C++(七)霍夫线检测+找出轮廓和外接矩形+改进旋转
  • Error: EACCES: permission denied, rename ‘/usr/local/lib/node_modules/appium‘
  • CentOS 7中,配置了Oracle jdk,但是使用java -version验证时,出现的版本是OpenJDK,如何解决?
  • 牛客 松鼠回家(二分答案+最短路)
  • Mysql in 查询的奇怪方向
  • ORB-SLAM2第二节---双目地图初始化
  • 后端常使用的中间件知识点--持续更新
  • 非科班的大家如何顺滑转码
  • webpack中常见的Loader
  • RabbitMQ:可靠消息传递的强大消息中间件
  • python 批量下载m3u8的视频
  • 最后一击
  • K8S资源管理方式
  • 第三章 图论 No.9有向图的强连通与半连通分量
  • 回归预测 | MATLAB实现基于PSO-LSSVM-Adaboost粒子群算法优化最小二乘支持向量机结合AdaBoost多输入单输出回归预测
  • Mysql 和Oracle的区别
  • 在收藏夹里“积灰”的好东西——“收藏从未停止,行动从未开始”
  • 【算法|数组】双指针
  • asp.net core6 webapi 使用反射批量注入接口层和实现接口层的接口的类到ioc中
  • 【2023】字节跳动 10 日心动计划——第九关
  • 小龟带你敲排序之冒泡排序
  • Nacos AP架构集群搭建(Windows)
  • nodejs+vue+elementui,图书评论管理系统_g9e3a
  • 基于TorchViz详解计算图(附代码)
  • 解决GitHub的速度很慢的几种方式
  • 设计模式再探——策略模式