当前位置: 首页 > article >正文

centos7.9使用docker-compose安装kafka

docker-compose配置文件

services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0.1hostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1kafka-manager:image: hlebalbau/kafka-manager:stablecontainer_name: kafka-managerdepends_on:- zookeeperports:- "9002:9000"environment:ZK_HOSTS: "zookeeper:2181"kAFKA_BROKERS: 192.168.1.85:9092KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaTestController {private final KafkaProducerService kafkaProducerService;public KafkaTestController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessageToKafka(@RequestParam(value = "topic", defaultValue = "test-topic") String topic,@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {kafkaProducerService.sendMessage(topic, message);return "消息已发送: " + message + " 到主题: " + topic;}
}

生产者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送成功: " + message +", 分区: " + result.getRecordMetadata().partition() +", 偏移量: " + result.getRecordMetadata().offset());} else {System.err.println("消息发送失败: " + ex.getMessage());}});}
}

消费者监听

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {// 监听指定的主题,groupId用于区分不同的消费者组@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")public void consumeMessage(ConsumerRecord<String, String> record) {System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 这里可以添加你的业务逻辑处理}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yykj</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka_demo</name><description>kafka_demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

kafka的启动:

tiup cdc cli changefeed create   --server=http://192.168.1.85:8300   --changefeed-id="kafka-debezium"   --sink-uri="kafka://192.168.1.85:9092/test-tidbmessage?protocol=debezium&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"   --config=config-cdc.yml

config-cdc.yml配置

force-replicate=true
[filter]
# 只同步 law 数据库下的三张表
rules = ['law.sys_dict', 'law.sys_user', 'law.sys_role']

http://www.lryc.cn/news/2387193.html

相关文章:

  • ETL 工具与数据中台的关系与区别
  • SQLMesh Typed Macros:让SQL宏更强大、更安全、更易维护
  • DeepSpeed-Ulysses:支持极长序列 Transformer 模型训练的系统优化方法
  • Docker 使用镜像[SpringBoot之Docker实战系列] - 第537篇
  • 解锁MCP:AI大模型的万能工具箱
  • Error in beforeDestroy hook: “Error: [ElementForm]unpected width “
  • vscode包含工程文件路径
  • 私有知识库 Coco AI 实战(七):摄入本地 PDF 文件
  • GitLab 18.0 正式发布,15.0 将不再受技术支持,须升级【二】
  • NtfsLookupAttributeByName函数分析之和Scb->AttributeName的关系
  • STM32H7系列USART驱动区别解析 stm32h7xx_hal_usart.c与stm32h7xx_ll_usart.c的区别?
  • 网络原理 | TCP与UDP协议的区别以及回显服务器的实现
  • IP动态伪装开关
  • 【Unity3D】将自动生成的脚本包含到C#工程文件中
  • 解决leetcode第3509题.最大化交错和为K的子序列乘积
  • 【Python 深度学习】1D~3D iou计算
  • java23
  • 嵌入式工程师常用软件
  • LitCTF2025 WEB
  • Redisson WatchDog会一直续期吗?
  • Linux 下VS Code 的使用
  • Android开发namespace奇葩bug
  • watchEffect
  • Qt 布局管理器的层级关系
  • Android 之 kotlin 语言学习笔记一
  • maven模块化开发
  • 为什么要使用stream流
  • 语义分割的image
  • 云原生安全之网络IP协议:从基础到实践指南
  • C++——QT 文件操作类