当前位置: 首页 > news >正文

大数据-玩转数据-Flink 自定义Sink(Mysql)

一、说明

如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,mysql 的安装使用请参考
mysql-玩转数据-centos7下mysql的安装
创建表

CREATE TABLE `sensor` (`id` int(10)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

二、pom.xml 导入驱动

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>

三、编写程序

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkMysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer value) throws Exception {return value.intValue();}});keyedStream.addSink(new MysqlSink());env.execute();}public static class  MysqlSink extends RichSinkFunction<Integer>{private Connection sunbo;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");sunbo = DriverManager.getConnection("jdbc:mysql://192.168.220.100:3306/test?useSSL=false", "sunbo", "Mysql123456#");}@Overridepublic void close() throws Exception {if (sunbo != null) {sunbo.close();}}@Overridepublic void invoke(Integer value, Context context) throws Exception {String sql = "insert into sensor(id)values(?)";PreparedStatement ps = sunbo.prepareStatement(sql);ps.setInt(1,value.intValue());ps.execute();ps.close();}}
}

四、运行测试

在这里插入图片描述

http://www.lryc.cn/news/128074.html

相关文章:

  • linux17 线程安全 线程同步
  • lvs集群与nat模式
  • 【开源分享】在线客服系统搭建-基于php和swoole客服系统CRMchat(附源码完整搭建教程)...
  • Webpact学习笔记记录
  • Python代码实现解析MULTIPOLYGON几何对象类型数据为嵌套列表
  • SSH连接工具汇总
  • Java的AQS框架是如何支撑起整个并发库的
  • 一.net core 自动化发布到docker (Jenkins安装)
  • 二刷LeetCode--148. 排序链表(C++版本),必会题,思维题
  • css flex 上下结构布局
  • win下qwidget全屏弹窗后其他窗口鼠标样式无法更新的问题
  • Java【数据结构】二分查找
  • 数据库技术--数据库引擎,数据访问接口及其关系详解(附加形象的比喻)
  • 【BASH】回顾与知识点梳理(三十三)
  • 同步请求和异步请求
  • Transformer是什么,Transformer应用
  • 故障011:dmap服务缺失libnsl.so修复
  • 第十三章 SpringBoot项目(总)
  • 利用Python隧道爬虫ip轻松构建全局爬虫网络
  • Spring Clould 网关 - Gateway
  • PHP使用phpmailer及SMTP服务实现邮件发送
  • 交换实验一
  • 计算机中丢失MSVCR120.dll,找不到MSVCR120.dll是什么意思?
  • avue多选列表根据后端返回的某个值去判断是否选中;avue-curd多选回显
  • Vue2中根据权限添加动态路由
  • 搭建 Python 环境 | Python、PyCharm
  • NPOI 读取和写入Excel
  • Linux工具【2】(调试器gdb、项目自动化构建工具make/Makefile)
  • C++ 网络编程项目fastDFS分布式文件系统(三)-Nginx部分
  • Apache-DBUtils