当前位置: 首页 > news >正文

Java版Flink使用指南——将消息写入到RabbitMQ的队列中

大纲

  • 新建工程
    • 新增依赖
  • 编码
    • 自动产生数据
    • 写入RabbitMQ
  • 测试
  • 工程代码

在 《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据,并将其写入日志中。本文将通过代码产生一些数据,然后将它们写入到另外一个RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

编码

自动产生数据

这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。

List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);

写入RabbitMQ

不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema());
stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);	

测试

打包、提交并运行任务
在这里插入图片描述
然后在RabbitMQ的后台可以看到收到两条消息
在这里插入图片描述
其内容也是我们之前在代码中生成的内容
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

http://www.lryc.cn/news/397544.html

相关文章:

  • python excel openpyxl
  • C++八股(一)
  • 【Git的基本操作】版本回退 | 撤销修改的三种情况 | 删除文件
  • STM32-I2C
  • 04.ffmpeg打印音视频媒体信息
  • 微信开发授权登录梳理总结
  • HTML5实现我的音乐网站源码
  • UNI_App平台调试指南 debug(十五)
  • LLM之RAG实战(四十一)| 使用LLamaIndex和Gemini构建高级搜索引擎
  • 【错题集-编程题】AOE还是单体?(贪心)
  • 怎么办?我的C盘又爆红了!别慌!博主手把手带你管理你的C盘空间~
  • react启用mobx @decorators装饰器语法
  • 计算机如何学习
  • 【Python 基础】函数 - 1
  • 从0到1开发一个Vue3的新手引导组件(附带遇到的问题以及解决方式)
  • 概率统计(二)
  • 文件类:如何将excel文件转为csv文件(且保留时间格式)?
  • FiddlerScript Rules修改-更改发包中的cookie
  • 直升机停机坪的H代表什么
  • hyperworks软件许可优化解决方案
  • 四川赤橙宏海商务信息咨询有限公司抖音电商服务靠谱吗?
  • 鸿蒙开发:Universal Keystore Kit(密钥管理服务)【密钥派生(C/C++)】
  • 【ARMv8/v9 GIC 系列 6 -- 中断优先级详细介绍】
  • 【CORS 报错】跨域请求问题:CORS 多种环境下的解决方案
  • 【Scrapy】深入了解 Scrapy 中间件中的 process_spider_output 方法
  • GigE Vision GVCP/GVSP
  • 结合C++智能指针聊聊观察者模式
  • 【React】监听浏览器返回事件
  • python用selenium网页模拟时无法定位元素解决方法1
  • css中文字书写方向