当前位置: 首页 > news >正文

KafkaStream:基本使用

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

http://www.lryc.cn/news/126413.html

相关文章:

  • 【数据结构】二叉树
  • 基于灰狼优化(GWO)、帝国竞争算法(ICA)和粒子群优化(PSO)对梯度下降法训练的神经网络的权值进行了改进(Matlab代码实现)
  • jenkins自动化构建保姆级教程(持续更新中)
  • HTTPS 的加密流程
  • Jmeter 参数化的几种方法
  • 剑指Offer45.把数组排成最小的数 C++
  • 【java毕业设计】基于SSM+MySql的人才公寓管理系统设计与实现(程序源码)--人才公寓管理系统
  • golang操作excel的高性能库——excelize/v2
  • 学习51单片机怎么开始?
  • [.NET学习笔记] -.NET6.0项目动态加载netstandard2.0报错但项目添加引用则正常的问题
  • 山景DSP芯片可烧录AP8224C2音频处理器方案
  • 来聊聊托管服务提供商(MSP)安全
  • 最新版本的Anaconda环境配置、Cuda、cuDNN以及pytorch环境一键式配置流程
  • 【数据结构与算法】十大经典排序算法-选择排序
  • 【Spring专题】Spring之Bean的生命周期源码解析——阶段一(扫描生成BeanDefinition)
  • 【C#】判断打印机共享状态
  • 运维监控学习笔记7
  • 【业务功能篇64】maven加速 配置settings.xml文件 镜像
  • Spring Boot(六十四):SpringBoot集成Gzip压缩数据
  • Mac安装opencv后无法导入cv2的解决方法
  • 【题解】按之字形顺序打印二叉树
  • 后端人员如何快速上手vue
  • 基于Prometheus监控Kubernetes集群
  • 【数据分析】pandas (三)
  • nvm命令
  • 从此已是义无反顾
  • Element组件浅尝辄止2:Card卡片组件
  • “深入剖析Java多态:点燃编程世界火花“
  • golang官方限流器rate包实践
  • [windows]MAT- 下载及安装