当前位置: 首页 > news >正文

flinksql kafka到mysql累计指标练习

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}

kafka topic :wxt1
在这里插入图片描述

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +"  id INT,\n" +"  name STRING,\n" +"  age INT,\n" +"  proctime as PROCTIME()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = '" + kafkaTopic + "',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = '" + groupId + "',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +"  window_start STRING,\n" +"  window_end STRING,\n" +"  name STRING,\n" +"  age INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'wxt2',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = 'kafka_table2',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +"  window_start String,\n" +"   window_end String,\n" +"    name String,\n" +"    age INT\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +"   'username' = 'root',\n" +"   'password' = '12345678',\n" +"   'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by  window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>
http://www.lryc.cn/news/207554.html

相关文章:

  • pdf转jpg的方法【ps和工具方法】
  • 【已解决】Qt发送信号后,槽函数没有响应
  • Kafka入门05——基础知识
  • WordPress(7)配置邮箱发送功能
  • 简化路径(C++解法)
  • CS224W1.1——图机器学习介绍
  • docker搭建waline评论系统
  • sql server 生成连续日期和数字
  • 太极v14.0.4 免ROOT用Xposed
  • python DevOps
  • Git(四)底层命令:git对象、树对象、提交对象
  • LVS-DR模式+keepalived+nginx+tomcat实现动静分离、负载均衡、高可用实验
  • canvas 状态管理
  • vue中如何给后端过来的数组中每一个对象加一个新的属性和新的对象(不影响后端的原始数据)
  • SpringAOP源码解析之TargetSource(四)
  • Centos7 安装nvidia显卡驱动
  • 22 行为型模式-状态模式
  • Jetpack:018-Jetpack中的导航一
  • Linux常见问题解决操作(yum被占用、lsb无此命令、Linux开机进入命令界面等)
  • 层次式架构的设计理论与实践
  • 【shell】read -t -n1
  • 【嵌入式项目应用】__cJSON在单片机的使用
  • 【智能家居】
  • Android stdio 无法新建或打开AIDL文件(解决方法)
  • 如何进行渗透测试以提高软件安全性?
  • YOLOv5 添加 OTA,并使用 coco、CrowdHuman数据集进行训练。
  • SpringBoot 日志
  • 非小米笔记本小米妙享中心安装最新教程 3.2.0.464 兼容所有Windows系统
  • 基于大数据的社交平台数据爬虫舆情分析可视化系统 计算机竞赛
  • MYSQL(事务)