一、依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.1.RELEASE</version>
</dependency>
二、配置文件
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: testGroupauto-offset-reset: latest
三、API
1、生产者
@Component
public class ProducerMsg {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void send(String topic, String msg) {kafkaTemplate.send(topic, msg);}public void sendCallback(String topic, String msg) {kafkaTemplate.send(topic, msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onSuccess(SendResult<String, String> stringStringSendResult) {RecordMetadata recordMetadata = stringStringSendResult.getRecordMetadata();final String topic = recordMetadata.topic();final int partition = recordMetadata.partition();final long offset = recordMetadata.offset();System.err.println(String.format("生产消息成功:topic: %s,partition: %s,offset: %s", topic, partition, offset));}@Overridepublic void onFailure(Throwable throwable) {}});}}
2、消费者
@Component
public class ConsumeMsg {@KafkaListener(topics = {"USER", "LOG"})public void consumeSingle(ConsumerRecord<String, String> consumer) {System.err.println("监听到kafka消息: " + consumer);final String topic = consumer.topic();final String value = consumer.value();}public void consumeBatch(List<ConsumerRecord<String, String>> consumers) {consumers.forEach(consumer -> {final String topic = consumer.topic();final String value = consumer.value();System.err.println(String.format("topic: %s,value: %s", topic, value));});}}