当前位置: 首页 > news >正文

spark3 streaming 读kafka写es

1. 代码

package data_import
import org.apache.spark.sql.{DataFrame, Row, SparkSession, SaveMode}
import org.apache.spark.sql.types.{ArrayType, DoubleType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.commons.lang3.exception.ExceptionUtils
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import org.apache.spark._
import org.apache.spark.streaming._
import alarm.Alarm
import org.elasticsearch.spark.sql._/** 用户特征记录*/
object KafkaImport {case class Parram(env: String, dst_table: String, kafka_address: String, kafka_topic: String, kafka_group_id: String, trigger_time: String)def main(args: Array[String]): Unit = {val param: Parram = utils.parseParam[Parram](args)println("args:" + param.toString())try {processTable(param)Alarm.alarm(env = param.env,level = Alarm.LevelInfo,content = "UserFeature Success")} catch {case e: Exception =>val msg = s"UserFeature handle failed,Error message:${e.getClass()}:${e.getMessage()}===>${ExceptionUtils.getStackTrace(e)}==>argsMap:${param.toString()}"println(msg)Alarm.alarm(env = param.env, level = Alarm.LevelWarning, content = msg)}}def processTable(param: Parram): Unit = {val conf = new SparkConf().setAppName("appName").setMaster("yarn")val ssc = new StreamingContext(conf, Seconds(param.trigger_time.toInt))val ss = SparkSession.builder.appName("KafkaImport").config("spark.sql.mergeSmallFiles.enabled", "true").config("spark.sql.mergeSmallFiles.threshold.avgSize", "true").config("spark.sql.mergeSmallFiles.maxSizePerTask", "true").config("es.net.http.auth.user", "elastic").config("es.net.http.auth.pass", "YX2021@greendog").getOrCreate()val kafkaParams = Map[String, Object]("bootstrap.servers" -> param.kafka_address,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> param.kafka_group_id,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array(param.kafka_topic)val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val schema: StructType = StructType(List(StructField("key", StringType, true),StructField("value", StringType, true)))stream.foreachRDD { rdd =>val data1 = rdd.map { record => Row(record.key, record.value) }val userData = ss.createDataFrame(data1, schema).withColumn("id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("app_id", get_json_object(col("value"), "$.ctx.app_id").cast(StringType)).withColumn("user_id", get_json_object(col("value"), "$.ctx.user_id").cast(StringType)).withColumn("session_id", get_json_object(col("value"), "$.ctx.session_id").cast(StringType)).withColumn("time", get_json_object(col("value"), "$.time").cast(LongType)).withColumn("datetime", getDayHourTime(col("time")).cast(TimestampType))userData.show()println("尝试连接ES...")val esDF = ss.read.format("org.elasticsearch.spark.sql").option("es.nodes", "192.168.145.43").option("es.port", "9200").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").load("test_saylo_user_feature_30033")println(s"索引中存在 ${esDF.count()} 条记录")userData.select(col("id"), col("session_id"), col("value"), col("app_id"), col("datetime")).filter(col("id").isNotNull && col("id") =!= "").write.option("es.nodes", "192.168.145.43").option("es.nodes.wan.only", "true").option("es.port", "9200").option("es.mapping.id", "id") // 替换为您的实际ID字段名// .option("es.mapping.type", "user_id:keyword") // 替换为您的实际ID字段名.mode("append").option("es.write.operation", "upsert").format("org.elasticsearch.spark.sql").option("es.net.http.auth.user", "elastic").option("es.net.http.auth.pass", "YX2021@greendog").save("test_saylo_user_feature_30033")val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()// 等待停止ssc.awaitTermination()}private val getDayHourTime = udf((timestamp: Long) => {utils.getDayTime(timestamp)})
}

2. 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xverse.saylo.rec</groupId><artifactId>saylo_rec_data_offline_v2</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><scala.version>2.12.10</scala.version><spark.version>3.2.2</spark.version><jackson.version>2.14.0</jackson.version><shade.jar.name>${project.artifactId}-${project.version}-jar-with-dependencies.jar</shade.jar.name></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-30_2.12</artifactId><version>7.12.0</version></dependency><dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>  <!-- 或者你需要的版本 --></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.qcloud</groupId><artifactId>cos_api</artifactId><version>5.6.227</version></dependency><dependency><groupId>com.typesafe.play</groupId><artifactId>play-json_2.12</artifactId><version>2.9.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version>    <!-- Add Jackson dependencies --></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.hadoop</groupId>--><!--      <artifactId>hadoop-client</artifactId>--><!--      <version>${spark.version}</version>--><!--    </dependency>--><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java-util</artifactId><version>3.6.1</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.64.0</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.64.0</version></dependency><!-- <dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency> --><dependency><groupId>org.json4s</groupId><artifactId>json4s-core_2.12</artifactId><version>3.6.6</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.6.3</version></dependency><dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-sdk-java-cls</artifactId><version>3.1.1174</version>    <!-- 使用最新版本 --></dependency><dependency><groupId>com.tencentcloudapi.cls</groupId><artifactId>tencentcloud-cls-sdk-java</artifactId><version>1.0.15</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.6.0</version></extension></extensions><plugins><!--      <plugin>--><!--        <groupId>org.scala-tools</groupId>--><!--        <artifactId>maven-scala-plugin</artifactId>--><!--        <version>2.9.1</version>--><!--        <executions>--><!--          <execution>--><!--            <goals>--><!--              <goal>compile</goal>--><!--              <goal>testCompile</goal>--><!--            </goals>--><!--          </execution>--><!--        </executions>--><!--      </plugin>--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.5.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${shade.jar.name}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><relocations><!-- <relocation><pattern>org.apache.commons</pattern><shadedPattern>com.acme.shaded.apachecommons</shadedPattern></relocation> --><relocation><pattern>com.google.protobuf</pattern><shadedPattern>my.project.shaded.protobuf</shadedPattern></relocation></relocations><createDependencyReducedPom>false</createDependencyReducedPom><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.Application</mainClass><manifestEntries><Implementation-Version>${version}</Implementation-Version><Main-Class>my.Application</Main-Class></manifestEntries></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /></transformers></configuration></execution></executions></plugin></plugins>
</build>
</project>

3. 注意事项

  1. ElasticSearch 7.x 默认不在支持指定索引类型
    如果es版本是7.+
    需注意
    save应写为
    .save(“test_saylo_user_feature_30033”)
    而不是
    .save(“test_saylo_user_feature_30033/docs”)
    否则会报类型转换错误。例如
    [user_id] cannot be changed from type [keyword] to [text]

  2. 依赖冲突
    找不到类

    Caused by: java.lang.ClassNotFoundException: com.acme.shaded.apachecommons.httpclient.HttpConnectionManager
    

在pom文件中手动加入,具体参加上面的pom文件

    <dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version>   </dependency>
http://www.lryc.cn/news/584871.html

相关文章:

  • 可以悬浮在Windows电脑桌面的好用便签软件评测
  • 前端开发—全栈开发
  • php use 命名空间与 spl_autoload_register的关系
  • DVWA靶场通关笔记-反射型XSS(Reflected Low级别)
  • uni-app获取手机当前连接的WIFI名称
  • 小皮面板搭建pikachu
  • 如何将文件从OPPO手机传输到电脑
  • GNhao,获取跨境手机SIM卡跨境通信新选择!
  • 手机恢复出厂设置怎么找回数据?Aiseesoft FoneLab for Android数据恢复工具分享
  • 【Python3教程】Python3高级篇之CGI编程
  • linux系统-----Redis主从复制
  • Hadoop 用户入门指南:驾驭大数据的力量
  • 传输层协议UDP原理
  • 【数据库基础 1】MySQL环境部署及基本操作
  • 工作间期休息一下
  • 重振索尼复古微型电脑——计划以OrangePi CM5 作为主板升级
  • php 如何通过mysqli操作数据库?
  • 平板柔光屏与镜面屏的区别有哪些?技术原理与适用场景全解析
  • Linux操作系统从入门到实战:怎么查看,删除,更新本地的软件镜像源
  • Linux权限的概念
  • 一文速览DeepSeek-R1的本地部署——可联网、可实现本地知识库问答(附教程)
  • 飞算Java AI:专为 Java 开发者打造的智能开发引擎
  • Linux:多线程---同步生产者消费者模型
  • 【Linux仓库】命令行参数与环境变量【进程·伍】
  • 【前端】【Echarts】【zrender】从入门到多路径信号流动动画实战
  • 飞算JavaAI:革新Java开发的智能助手
  • Linux kernel devm_gpiod_get()函数详解
  • 彻底解决JavaFx在Linux中文无法正常显示的问题(究其根本原因)
  • 飞书CEO谢欣:挑战巨头,打造AI新时代的Office
  • 锁的艺术:从Mutex到ReentrantLock,掌握并发编程的脉搏