当前位置: 首页 > news >正文

[Spark Streaming] 读取 Kafka 消息, 插入到 MySQL

以下是一个简单的使用 Spark Streaming 读取 Kafka 消息、统计数据后插入到 MySQL 中的 Scala 代码示例:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import java.sql.DriverManagerobject KafkaToMysql {def main(args: Array[String]): Unit = {// 创建 SparkConfval conf = new SparkConf().setAppName("KafkaToMysql")// 创建 StreamingContextval ssc = new StreamingContext(conf, Seconds(5))// 设置 Kafka 相关参数val kafkaParams = Map[String, String]("bootstrap.servers" -> "your_kafka_broker:9092","group.id" -> "your_group_id")// 定义要读取的 Kafka 主题val topics = Array("your_topic")// 使用 KafkaUtils 创建 DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 解析和统计数据val data = kafkaStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)// 定义数据库连接相关信息val url = "jdbc:mysql://your_host:your_port/your_database"val username = "your_username"val password = "your_password"// 将统计结果插入到 MySQL 中data.foreachRDD(rdd => {rdd.foreachPartition(partition => {Class.forName("com.mysql.jdbc.Driver")val connection = DriverManager.getConnection(url, username, password)val statement = connection.createStatement()partition.foreach { case (word, count) =>val sql = s"INSERT INTO your_table (word, count) VALUES ('$word', $count)"statement.executeUpdate(sql)}connection.close()})})ssc.start()ssc.awaitTermination()}
}
http://www.lryc.cn/news/414555.html

相关文章:

  • 精选3款国内wordpress 主题,建站首选
  • JavaScript之 Uint8Array 类型数组(solana pda场景中的大小端)
  • 《Windows API每日一练》24.1 WinSock简介
  • openwrt编译Dockerfile
  • 【C语言】分支与循环(循环篇)——结尾猜数字游戏实现
  • 【数据结构】链表篇
  • Python SciPy介绍
  • docker镜像源
  • 【clion】clion打开文件目录卡死问题
  • [CR]厚云填补_GridFormer
  • PostgreSQL数据库内核(二):通过initdb传递guc参数
  • rust常用的宏使用记录(九)
  • 【Python机器学习】支持向量机——手写数字识别问题
  • 学习笔记-Cookie、Session、JWT
  • 题海战术,面试必胜秘诀
  • 设计模式详解(十九)——命令模式
  • 实战:MySQL数据同步神器之Canal
  • 5.6软件工程-运维
  • 在JavaScript中如何确保构造函数只被new调用
  • 【数据结构算法经典题目刨析(c语言)】反转链表(图文详解)
  • 机器学习之争:Python vs R,谁更胜一筹?
  • Vulnhub靶机:JANGOW_ 1.0.1
  • Python脚本实现USB自动复制文件
  • 【C++学习第19天】最小生成树(对应无向图)
  • 第一个 Flask 项目
  • 利用 Angular 发挥环境的力量
  • Vue3+TypeScript+printjs 实现标签批量打印功能
  • 微信文件如何直接打印及打印功能在哪里设置?
  • dataX -20240804-master分支
  • 【网络】传输层