当前位置: 首页 > news >正文

KafkaStream:Springboot中集成

1、在kafka-demo中创建配置类

        配置kafka参数

package com.heima.kafkademo.config;import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

2、在application.yml中配置上面配置类需要的参数

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

3、新增配置类,创建KStream对象,进行聚合

package com.heima.kafkademo.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

4、启动kafka-demo服务测试

        使用生产者发送消息可以看到控制台接收成功

 

http://www.lryc.cn/news/126367.html

相关文章:

  • 包管理工具 nvm npm nrm yarn cnpm npx pnpm详解
  • 【java】mybatis-plus代码生成
  • 小样本UIE 信息抽取微调快速上手(不含doccona标注)
  • Vue项目(购物车)
  • 23.08.16驱动点灯
  • 数据结构——堆
  • 重复学习1:NLP
  • 做海外游戏推广有哪些条件?
  • JavaFx基础学习【五】:FXML布局文件使用
  • 通过Python爬虫提升网站搜索排名
  • 【博客698】为什么当linux作为router使用时,安装docker后流量转发失败
  • el-dialog嵌套,修改内层el-dialog样式(自定义样式)
  • B树和B+树区别
  • intelJ IDEA\PHPStorm \WebStorm\PyCharm 通过ssh连接远程Mysql\Postgresql等数据库
  • vfuhyuuy
  • CSS自学框架之表单
  • 使用Spring Boot和Redis实现用户IP接口限流的详细指南
  • 前端性能优化——包体积压缩插件,打包速度提升插件,提升浏览器响应的速率模式
  • 配置vscode
  • 【Spring】深入理解 Spring 事务及其传播机制
  • eclipse常用设置
  • ajax解析
  • CSS3:图片边框
  • (七)Unity VR项目升级至Vision Pro需要做的工作
  • 【计算机视觉|生成对抗】生成对抗网络(GAN)
  • 神经网络基础-神经网络补充概念-15-神经网络概览
  • iOS Epub阅读器改造记录
  • 负载均衡搭建
  • form表单input标签的23种type类型值?
  • python selenium如何保存网站的cookie用于下次自动登录