当前位置: 首页 > article >正文

Kafka集成Flume/Spark/Flink(大数据)/SpringBoot

Kafka集成Flume

在这里插入图片描述

Flume生产者

在这里插入图片描述
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Flume消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka集成Spark

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties  = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//创建一个生产者var producer = new KafkaProducer[String,String](properties)//发送数据for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//关闭资源producer.close()}
}

在这里插入图片描述

消费者
在这里插入图片描述

Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消费数据val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//执行代码,并阻塞ssc.start()ssc.awaitTermination()}
}

Kafka集成Flink

在这里插入图片描述

创建maven项目,导入以下依赖
在这里插入图片描述
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
在这里插入图片描述

Flink生产者

public class FlinkafkaProducer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//准备数据源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//创建一个kafka生产者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加数据源Kafka生产者stream.addSink(kafkaProducer);//执行env.execute();}
}

在这里插入图片描述

Flink消费者

public class FlinkafkaConsumer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//创建一个消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//关联消费者和flink流env.addSource(kafkaConsumer).print();//执行env.execute();}
}

Kafka集成SpringBoot

在这里插入图片描述
在这里插入图片描述

生产者
在这里插入图片描述
在这里插入图片描述
通过浏览器发送
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

在这里插入图片描述

http://www.lryc.cn/news/2397009.html

相关文章:

  • Scratch节日 | 拯救屈原 | 端午节
  • rabbitmq Direct交换机简介
  • Git实战--基于已有分支克隆进行项目开发的完整流程
  • MapReduce(期末速成版)
  • 鸿蒙OSUniApp 移动端直播流播放实战:打造符合鸿蒙设计风格的播放器#三方框架 #Uniapp
  • C3、C2f、C3K2、C2PSA的具体结构
  • 2_MCU开发环境搭建-配置MDK兼容Keil4和C51
  • 通过远程桌面连接Windows实例提示“出现身份验证错误,无法连接到本地安全机构”错误怎么办?
  • 百度golang研发一面面经
  • TC3xx学习笔记-启动过程详解(一)
  • Scratch节日 | 六一儿童节抓糖果
  • 系统调用与程序接口的关系
  • 从线性方程组角度理解公式 s=n−r(3E−A)
  • 通信算法之280:无人机侦测模块知识框架思维导图
  • 【Doris基础】Apache Doris中的Coordinator节点作用详解
  • 软考 系统架构设计师之考试感悟3
  • 【Kubernetes-1.30】--containerd部署
  • Flutter 嵌套H5 传参数
  • 什么是线程上下文切换?
  • Jvm 元空间大小分配原则
  • 相机--相机标定
  • MongoDB(七) - MongoDB副本集安装与配置
  • 131. 分割回文串-两种回溯思路
  • [Java恶补day13] 53. 最大子数组和
  • 摩尔投票算法原理实现一文剖析
  • springboot项目下面的单元测试注入的RedisConnectionFactory类redisConnectionFactory值为什么为空呢?
  • MyBatis操作数据库(2)
  • C++面向对象(二)
  • 【C语言入门级教学】冒泡排序和指针数组
  • shell脚本中常用的命令