当前位置: 首页 > news >正文

Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//从文件中读取数据并转换为 类val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换val dataStream: DataStream[SensorReading] = inputStreamFromFile.map( data => {var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHostsval httpHost = new util.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201httpHost.add(new HttpHost("192.168.1.12",9200,"http"))httpHost.add(new HttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的functionval esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {//element 每一条数据 通过 index 发送override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {//包装写入 es 的数据val dataSource = new util.HashMap[String,String]()dataSource.put("sensor_id",element.id)dataSource.put("temp",element.temperature.toString)dataSource.put("ts",element.timestamp.toString)//indexval indexRequest = Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)index.add(indexRequest)println("saved successfully " + element.toString)}}//输出值 esdataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())env.execute("es")}
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

http://www.lryc.cn/news/269296.html

相关文章:

  • web三层架构
  • 智能优化算法应用:基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码
  • 写在2023年末,软件测试面试题总结
  • 51系列--数码管显示的4X4矩阵键盘设计
  • 医院绩效考核系统源码,java源码,商业级医院绩效核算系统源码
  • JavaScript基础练习题(五)
  • flutter项目从创建到运行,以及一些常用的命令
  • 【Amazon 实验②】Amazon WAF功能增强之使用Cloudfront、Lambda@Edge阻挡攻击
  • There are 4 missing blocks. The following files may be corrupted
  • 一起玩儿物联网人工智能小车(ESP32)——13. 用ESP32的GPIO控制智能小车运动起来(一)
  • D9741 PWM控制器电路,定时闩锁、短路保护电路,输出基准电压(2.5V) 采用SOP16封装
  • 【UE5.1】程序化生成Nanite植被
  • 【软件工程】漫谈增量过程模型:软件开发的逐步之道
  • Android Camera
  • Python开发雷点总结
  • Linux中磁盘管理与文件系统
  • Vue2+element-ui 实现select选择器结合Tree树形控件实现下拉树效果
  • LINUX 解决系统卡死:扩大内存交换分区
  • Vue项目Nginx代理F5刷新出现404问题解决
  • 关于MybatisPlus自动转化驼峰命名规则配置mapUnderscoreToCamelCase的个人测试和总结
  • css中的BFC
  • 音视频类App广告变现如何破局,最大化广告变现收益,让应用增收?
  • 基于llama-index对embedding模型进行微调
  • 如何本地搭建FastDFS文件服务器并实现远程访问【内网穿透】
  • spring基于Xml管理bean---Ioc依赖注入:对象类型属性赋值(2)----内部bean的引入(bean和bean之间的引入)、(3)级联方式注入
  • Python电能质量扰动信号分类(二)基于CNN模型的一维信号分类
  • 如何解决报错:Another app is currently holding yum lock?
  • electron使用electron-builder进行MacOS的 打包、签名、公证、上架、自动更新
  • RAD Studio 12 安装激活说明及常见问题
  • JavaScript实现视频共享