springboot集成kafka
1、引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.6</version></dependency>
2、配置
server:port: 9099
spring:kafka:bootstrap-servers: 192.168.157.101:9092consumer:group-id: test-consumer-groupmax-poll-records: 10concurrency: 10#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】auto-offset-reset: earliest#是否开启自动提交enable-auto-commit: falseack-mode: MANUAL_IMMEDIATE#自动提交的时间间隔auto-commit-interval: 1000#key的解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#value的解码方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:batch-listener: trueproducer:batch-size: 4096buffer-memory: 40960retries: 1value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializerlistener:#创建多少个consumer,值必须小于等于Kafk Topic的分区数。ack-mode: MANUAL_IMMEDIATEconcurrency: 1 #推荐设置为topic的分区数
3、测试
@AutowiredKafkaTemplate<String,String> kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 100; i++) {kafkaTemplate.send("hello","hello"+i);}}
如日志打印:Connection to node 0 (/127.0.0.1:9092)
#进入kafka安装目录
/usr/local/kafka_2.12-3.5.1/config
#编辑配置文件
vim server.properties# 允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 外部代理地址
advertised.listeners=PLAINTEXT://192.168.157.101:9092
重启kafka服务