当前位置: 首页 > news >正文

kafka代码示例

安装kafka:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.10.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>

kafka配置:

在 application.properties 添加以下配置:

### kafka生产者
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer### kafka消费者
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5

生产者代码:

  • bean对象:
public class MyMsg {private String id;private String name;//忽略getter、setter
}
  • KafkaProducerService :

生产者发送消息。

@Component
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/*** 发送消息,处理回调。* 在发送消息时会自动创建你设置的 topic。**/public void send()  {MyMsg myMsg = new MyMsg();myMsg.setName("lin");myMsg.setId("1234");//发送消息ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("myTopic1", "key", JSON.toJSONString(myMsg));//处理回调的结果,比如消息发送失败的处理。如果不需要回调,也可以不处理。future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("消息发送失败." + ex);}@Overridepublic void onSuccess(SendResult<String, String> result) {ProducerRecord<String, String> producerRecord = result.getProducerRecord();RecordMetadata recordMetadata = result.getRecordMetadata();System.out.println("消息发送成功.producerRecord:"+ JSON.toJSONString(producerRecord)+ ",recordMetadata:" + JSON.toJSONString(recordMetadata));}});}}
  • 调用生产者发送消息:
@RestController
@RequestMapping("/")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@PostMapping(value = "/kafka/send")public void send()  {kafkaProducerService.send();}}

消费者代码:

  • KafkaConsumerService:
@Component
public class KafkaConsumerService {/*** Kafka监听器,可以监听消息。* 指定需要监听的 kafka 主题 topics,可以是多个topic.* 指定消费者群组 groupId,可以不写.**/@KafkaListener( topics = {"myTopic1"} , groupId ="myGroup")public void consume(ConsumerRecord<String, String> consumerRecord)  {System.out.println("消费者接收到信息,内容为:" + consumerRecord.value());System.out.println("偏移量:" +  consumerRecord.offset());}}

测试结果 :

调用生产者发送消息,消费者成功接收到消息,类似如下:

消费者接收到信息,内容为:{"id":"1234","name":"lin"}
偏移量:19
http://www.lryc.cn/news/210680.html

相关文章:

  • 文件夹批量改名:如何在文件夹名左边添加递增的自动编号
  • Flash(Animate)和木疙瘩的元件学习和理解
  • C#两个表多条件关联写法
  • VSCode-C/C++环境配置
  • 第八周实验记录
  • Spring Cloud Alibaba Seata 实现 SAGA 事物
  • npm install报错,解决记录
  • LSM树原理详解
  • Linux系统编程_网络编程:字节序、socket、serverclient、ftp 云盘
  • 队列(8.6)
  • 计算机网络 第四章网络层
  • 操作系统运行机制
  • mathtype7.4破解永久激活码
  • 66 内网安全-域横向批量atschtasksimpacket
  • PCI9054入门1:硬件引脚定义、时序、FPGA端驱动源码
  • 多媒体应用设计师 第17章 多媒体应用场景的技术应用和实现示例
  • react151618刷新几次的问题
  • 【Spring】IOC容器与Bean的常用属性配置
  • 2023年下半年 系统集成项目管理工程师 真题考点(一二三四批次)(10月28、29)(网友回忆版)
  • 读韩都衣舍,谈权力转移的激励制度
  • 私有云:【10】VCenter安装win10
  • [Java/力扣100]判断两棵二叉树是否相同
  • BEC商务英语主题 定价策略|柯桥学商务英语口语
  • 第七章 ObjectScript 一般系统限制
  • 【Python百练——第1练】使用Python求100以内的所有偶数
  • springboot心理咨询管理系统
  • Java-API简析_java.net.URL类(基于 Latest JDK)(浅析源码)
  • C语言浮点型在内存中的存储
  • elementPlus | el-tabs 标签管理路由页面
  • 如何使用ffmpeg制作透明背景的视频