当前位置: 首页 > news >正文

关于springboot创建kafkaTopic

工具类提供,方法名见名知意。使用kafka admin


import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;import java.util.*;
import java.util.concurrent.ExecutionException;import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
/*** @author: zhoumo* @data: 2024/6/24 16:37* @descriptions:*/
public class KafkaTopicInfo {final static String ip="127.0.0.1:9090";public static void main(String[] args) {getListDetail();}public static void createTopic(String topicName) throws ExecutionException, InterruptedException {// Kafka 配置Properties props = new Properties();// Kafka 服务器地址和端口props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);// 创建 AdminClient 实例try (AdminClient adminClient = AdminClient.create(props)) {// 创建一个新的主题// 指定分区数量// 指定复制因子int numPartitions = 2;short replicationFactor = 1;NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);// 创建主题adminClient.createTopics(Collections.singletonList(newTopic)).all().get();System.out.println("Topic created successfully: " + topicName);} catch (Exception e) {e.printStackTrace();}}public static void deleteTopic(String topicName) {// Kafka 配置Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);try (AdminClient adminClient = AdminClient.create(props)) {// 要删除的主题名称//   String topicName = "myTopic";// 删除主题DeleteTopicsResult deleteResult = adminClient.deleteTopics(Collections.singletonList(topicName));deleteResult.all().get();System.out.println("Topic deleted successfully: " + topicName);} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}public static void getList() {// Kafka 配置Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);try (AdminClient adminClient = AdminClient.create(props)) {// 列出所有主题ListTopicsOptions options = new ListTopicsOptions();// 是否包括内部主题,默认为 falseoptions.listInternal(true);ListTopicsResult topicsResult = adminClient.listTopics(options);Set<String> topics = topicsResult.names().get();System.out.println("Existing topics:");for (String topic : topics) {System.out.println(topic);}} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}public static void getListDetail() {// Kafka 配置Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);try (AdminClient adminClient = AdminClient.create(props)) {// 列出所有主题ListTopicsOptions options = new ListTopicsOptions();// 是否包括内部主题,默认为 falseoptions.listInternal(true);KafkaFuture<Set<String>> topics = adminClient.listTopics(options).names();System.out.println("Existing topics:");for (String topic : topics.get()) {System.out.println(topic);// 获取主题的详细信息(包括分区情况)/*回退jdk1.8 版本KafkaFuture<TopicDescription> topicDescription = adminClient.describeTopics(Set.of(topic)).values().get(topic);printTopicDetails(topicDescription.get());*/Set<String> topicSet = new HashSet<>();topicSet.add(topic);KafkaFuture<TopicDescription> topicDescriptionFuture = adminClient.describeTopics(topicSet).values().get(topic);TopicDescription topicDescription = topicDescriptionFuture.get();printTopicDetails(topicDescription);}} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}private static void printTopicDetails(TopicDescription topicDescription) {System.out.println("Topic: " + topicDescription.name());System.out.println("Partitions:");for (TopicPartitionInfo partition : topicDescription.partitions()) {System.out.printf("  Partition %d, Leader: %d, Replicas: %s, Isrs: %s%n",partition.partition(),partition.leader().id(),partition.replicas(),partition.isr());}System.out.println();}
}
http://www.lryc.cn/news/388639.html

相关文章:

  • OOAD的概念
  • Day47
  • 【面试系列】后端开发工程师 高频面试题及详细解答
  • mac|浏览器链接不上服务器但可以登微信
  • Spring Cloud Alibaba之负载均衡组件Ribbon
  • tkinter显示图片
  • 000.二分查找算法题解目录
  • 数据资产赋能企业决策:通过精准的数据分析和洞察,构建高效的数据资产解决方案,为企业提供决策支持,助力企业实现精准营销、风险管理、产品创新等目标,提升企业竞争力
  • 【java开发环境】多版本jdk 自由切换window和linux
  • MySQL实训项目——餐饮点餐系统
  • 昇思MindSpore学习总结七——模型训练
  • AI时代创新潮涌,从探路到引路,萤石云引领千行百业创新
  • 计算机毕业设计Python深度学习美食推荐系统 美食可视化 美食数据分析大屏 美食爬虫 美团爬虫 机器学习 大数据毕业设计 Django Vue.js
  • 【鸿蒙学习笔记】鸿蒙ArkTS学习笔记
  • 广东行政职业学院数据智能订单班开班暨上进双创工作室签约仪式圆满结束
  • python与matlab微分切片的区别
  • MSPG3507——蓝牙接收数据显示在OLED,滴答定时器延时500MS
  • Linux 安装 Redis 教程
  • 【高考志愿】建筑学
  • Kubernetes的发展历程:从Google内部项目到云原生计算的基石
  • /proc/config.gz
  • 论坛万能粘贴手(可将任意文件转为文本)
  • 学习笔记——动态路由——OSPF(OSPF协议的工作原理)
  • Mybatis1(JDBC编程和ORM模型 MyBatis简介 实现增删改查 MyBatis生命周期)
  • 论文阅读YOLO-World: Real-Time Open-Vocabulary Object Detection
  • SM2的签名值byte数组与ASN.1互转
  • 云计算与生成式AI的技术盛宴!亚马逊云科技深圳 Community Day 社区活动流程抢先知道!
  • 【鸿蒙学习笔记】基础组件Progress:进度条组件
  • 前程无忧滑块
  • 一站式uniapp优质源码项目模版交易平台的崛起与影响