当前位置: 首页 > news >正文

Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

新增Json库依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.17.1</version></dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.32</version><scope>provided</scope></dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {private Long id;private String name;private int age;private Boolean married;private Double salary;public String toJson() throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(this);}public static SampleData fromJson(String json) throws JsonProcessingException {ObjectMapper mapper = new ObjectMapper();return mapper.readValue(json, SampleData.class);}
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;import java.io.IOException;public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {@Overridepublic SampleData deserialize(byte[] message) throws IOException {return SampleData.fromJson(new String(message));}@Overridepublic boolean isEndOfStream(SampleData nextElement) {return false;}@Overridepublic TypeInformation<SampleData> getProducedType() {return TypeInformation.of(SampleData.class);}
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

http://www.lryc.cn/news/395363.html

相关文章:

  • CV每日论文--2024.7.8
  • 【AI大模型】赋能儿童安全:楼层与室内定位实践与未来发展
  • 云服务器linux系统安装配置docker
  • 泰勒雷达图2
  • 数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比
  • react根据后端返回数据动态添加路由
  • 机器学习中的可解释性
  • 上海慕尼黑电子展开展,启明智显携物联网前沿方案亮相
  • Centos7离线安装ElasticSearch7.4.2
  • 深入理解sklearn中的模型参数优化技术
  • 【Elasticsearch】开源搜索技术的演进与选择:Elasticsearch 与 OpenSearch
  • 欧拉openEuler 22.03 LTS-部署k8sv1.03.1
  • 老年生活照护实训室:为养老服务业输送专业人才
  • go语言中使用WaitGroup和channel实现处理多线程问题
  • Open3D 计算点云的平均密度
  • C语言之数据在内存中的存储(1),整形与大小端字节序
  • B端全局导航:左侧还是顶部?不是随随便便,有依据在。
  • 什么是海外仓管理自动化?策略及落地实施步骤指南
  • 自定义控件三部曲之绘图篇(六)Paint之函数大汇总、ColorMatrix与滤镜效果、setColorFilter
  • 请写sql满足业务:找到连续登录3天以上的用户
  • fatal error: apriltag/apriltag.h: No such file or directory 的 参考解决方法
  • C++继承(一文说懂)
  • 卷积神经网络可视化的探索
  • RxJava学习记录
  • Spring Boot Vue 毕设系统讲解 3
  • Spring Boot对接大模型:实战价值与技巧
  • 完美解决NameError: name ‘file‘ is not defined的正确解决方法,亲测有效!!!
  • Witness Table 的由来
  • Python 3 AI 编程助手
  • 【nginx】nginx的配置文件到底是什么结构,到底怎么写?