当前位置: 首页 > news >正文

kafka消息队列简单使用

下面是使用Spring Boot和Kafka实现消息队列的简单例子:

  1. 引入依赖

在pom.xml中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.5</version>
</dependency>
  1. 配置Kafka

在application.properties中添加Kafka的相关配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  1. 发送消息

创建一个生产者类,使用KafkaTemplate发送消息:

@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
  1. 接收消息

创建一个消费者类,使用@KafkaListener注解监听指定的主题,处理消息:

@Service
public class KafkaConsumerService {@KafkaListener(topics = "myTopic", groupId = "myGroup")public void onMessage(String message) {System.out.println("Received message: " + message);}
}
  1. 测试

在Controller中调用生产者发送消息,然后在控制台中可以看到消费者接收到的消息:

@RestController
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@GetMapping("/send")public String sendMessage() {kafkaProducerService.sendMessage("myTopic", "Hello, Kafka!");return "Message sent successfully";}
}

以上就是一个简单的使用Spring Boot和Kafka实现消息队列的例子

分区

  1. 编写Kafka生产者代码,使用KafkaTemplate发送消息,并指定分区号。如下所示:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message, int partition) {kafkaTemplate.send("my-topic", partition, null, message);

2.编写Kafka消费者代码,使用@KafkaListener注解监听指定的主题,并在方法参数中获取分区号。如下所示:

@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println("Received message: " + record.value() + ", partition: " + partition);
http://www.lryc.cn/news/178429.html

相关文章:

  • 性能优化实战使用CountDownLatch
  • 基于视频技术与AI检测算法的体育场馆远程视频智能化监控方案
  • leetcodetop100(29) K 个一组翻转链表
  • 最新影视视频微信小程序源码-带支付和采集功能/微信小程序影视源码PHP(更新)
  • C++:vector 定义,用法,作用,注意点
  • Firecamp2.7.1exe安装与工具调试向后端发送SocketIO请求
  • MySQL到TiDB:Hive Metastore横向扩展之路
  • 算法通关村-----寻找祖先问题
  • Sentinel结合Nacos实现配置持久化(全面)
  • Verilog中什么是断言?
  • Oracle分区的使用详解:创建、修改和删除分区,处理分区已满或不存在的插入数据,以及分区历史数据与近期数据的操作指南
  • SLAM从入门到精通(amcl定位使用)
  • 【C/C++】C/C++面试八股
  • Scala第八章节
  • k8s-实战——kubeadm二进制编译
  • vite 和 webpack 的区别
  • 传统遗产与技术相遇,古彝文的数字化与保护
  • 多维时序 | MATLAB实现WOA-CNN-GRU-Attention多变量时间序列预测(SE注意力机制)
  • 1042 字符统计
  • 3 OpenCV两张图片实现稀疏点云的生成
  • 在Springboot项目中使用Redis提供给Lua的脚本
  • 分类预测 | MATLAB实现NGO-CNN北方苍鹰算法优化卷积神经网络数据分类预测
  • Linux或Centos查看CPU和内存占用情况_top只能查看对应的命令_如何查看具体进程---linux工作笔记062
  • 什么是DevOps
  • 力扣每日一题
  • 测试OpenCvSharp库的模板匹配功能
  • 网络编程day04(网络属性函数、广播、组播、TCP并发)
  • HALCON支持GPU加速的算子有哪些?
  • MacBook Pro 电池电量限制充电怎么设置AlDente Pro for Mac最大充电限制工具
  • 毕业设计选题之Java+springboot线上蔬菜销售与配送系统(源码+调试+开题+lw)