当前位置: 首页 > news >正文

kafka复习:(25)kafka stream

一、java代码:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class KafkaTest25 {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
}

二、启动console producer:

bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input

三、启动console consumer:

./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \--topic streams-wordcount-output \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

四、在producer端输入字符串(空格分割),看consumer输出

http://www.lryc.cn/news/159306.html

相关文章:

  • 接口自动化测试总结
  • 【Redis】Lua脚本在Redis中的基本使用及其原子性保证原理
  • 汇编--int指令
  • 生成式AI的JavScript技术栈
  • 从零开始学习软件测试-第39天笔记
  • 【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策
  • Vue2+Vue3基础入门到实战项目(六)——课程学习笔记
  • QT—基于http协议的网络文件下载
  • SpringBoot-配置优先级
  • 科普初步了解大模型
  • Nginx 和 网关的关系是什么
  • 解决springboot项目中的groupId、package或路径的混淆问题
  • Vmware 网络恢复断网和连接
  • 学生来看!如何白嫖内网穿透?点进来!
  • C++中的stack和queue
  • Ubuntu-22.04通过RDP协议连接远程桌面
  • 20230908java面经整理
  • uniapp 开发App 网络异常如何处理
  • docker安装常用软件
  • CocosCreator3.8研究笔记(五)CocosCreator 脚本说明及使用(下)
  • Adobe Acrobat Reader界面改版 - 解决方案
  • 实用调试技巧(2)
  • 海外ASO优化之如何优化游戏应用
  • SpringMVC: Java Web应用开发的框架之选
  • 【华为设备升级】AR路由器升级设备软件示例
  • Dataset 的一些 Java api 操作
  • Vue + Element UI 前端篇(十一):第三方图标库
  • HDFS:Hadoop文件系统(HDFS)
  • SpringMvc--综合案例
  • 工业4.0时代生产系统对接集成优势,MES和ERP专业一体化管理-亿发