当前位置: 首页 > news >正文

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

          这是仿真过程某图:

仿真场景kafkaStream
仿真实战kafka
 

 kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收

数据落地情况: 

全部接收到并all存入mysql

下面就简单分享一下StructuredStreaming代码吧

import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType,  StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession = SparkSession.builder().appName("kafkaConsumer").master("local[3]").getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType = new StructType().add("id", dataType = IntegerType).add("name", dataType = StringType).add("sorce", dataType = IntegerType)val message: DataFrame = spark.readStream // message为从kafka读到的原数据.format("kafka").option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092").option("subscribe", "xxxx").option("startingOffsets", "latest").load()// 将json字符串转化为结构化数据val streamData: DataFrame = message.selectExpr("cast(value as String) as message") .select(from_json($"message", Jsonschmea).alias("data"))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"val sqluser = "xxxx"val sqlpass = "xxxxx"Class.forName("com.mysql.cj.jdbc.Driver")  // mysql 8.0后得驱动,旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords =>val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"val preparedStatement = connection.prepareStatement(insertsql)partitionOfRecords.foreach {row =>
//              val id = row.getAs[Int]("data.id")
//              val name = row.getAs[String]("data.name")
//              val score = row.getAs[Int]("data.sorce")val id = row.getAs[Row]("data").getAs[Int]("id")val name = row.getAs[Row]("data").getAs[String]("name")val sorce = row.getAs[Row]("data").getAs[Int]("sorce")// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close()  // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch.start().awaitTermination()

存储按照一定批次量做存储   

友情提示 : 上述程序是经过脱敏处理的哦

----彩蛋----

如果你看到者你会知道scala在11更新之后也就是12版本如下:

batchDF.foreachPartition {partitionOfRecords => ... 这个位置

 Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理

所以更改后为

batchDF.rdd.foreachPartition { partitionOfRecords => ...

而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储

ok -----

http://www.lryc.cn/news/318021.html

相关文章:

  • IDEA如何删除git最新一次远程提交
  • 什么是单向数据流
  • Qt 线程池 QThreadPool
  • 【兔子机器人】实现从初始状态到站立
  • ImportError: cannot import name ‘open_filename‘ from ‘pdfminer.utils‘已搞定
  • 一文解决Word中公式插入问题(全免费/latex公式输入/texsWord)
  • C语言实战——扫雷游戏
  • .Net使用ElasticSearch
  • HTML5、CSS3面试题(二)
  • sqllab第十一关通关笔记
  • 机械女生,双非本985硕,目前学了C 基础知识,转嵌入式还是java更好?
  • Python之字符串操作大全(29种方法)
  • ArcGIS学习(十五)用地适宜性评价
  • 【matlab】如何将.mat文件与.nii文件互转
  • Uni-app开发Canvas当子组件示例,点点绘制图形
  • 从金蝶云星空到钉钉通过接口配置打通数据
  • Unreal发布Android在刘海屏手机上不能全屏显示问题
  • hive库表占用空间大小的命令
  • 关于go中的select
  • 【Node.js从基础到高级运用】十一、构建RESTful API
  • Python和MATLAB数字信号波形和模型模拟
  • 华为OD技术C卷“测试用例执行计划”Java解答
  • solana 入门 1
  • JavaEE之多线程(创建线程的五种写法)详解
  • ChatGPT国内能用吗?中国用户怎么才能使用ChatGPT?
  • 集群保持集群负载均衡和hash一致性
  • 吴恩达深度学习笔记:神经网络的编程基础2.9-2.14
  • 在C++项目中使用python脚本(四种)常见报错解决
  • 微前端框架 qiankun 配置使用【基于 vue/react脚手架创建项目 】
  • nodejs切换淘宝源