当前位置: 首页 > news >正文

即时通讯增加kafka渠道

此次给im服务增加kafka渠道,刚好最近有对SpringCloudStream进行了解,刚好用来练练手

增加kafka渠道

  • pom.xml

引入stream相关依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

由于涉及到SpringCloud,可以交由spring-cloud-dependencies统一管理

<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.7.18</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.8</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
  • application.yml

在yml中对stream相关项进行配置

server:port: 18080
cus:ws:exclude-receiver-info-flag: truereceiver-excludes-himself-flag: true#更改渠道为streamcommunication-type: streamcloud:function:#允许stream访问的beandefinition: listenerstream:kafka:binder:#kafka链接信息brokers: ${KAFKA_BROKERS:127.0.0.1:9092}#允许自动创建topicauto-create-topics: truebindings:#消费者bean-in-indexlistener-in-0:#主题destination: TEST_TOPIC
  • RedisSendExecutor

kafka生产者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.constant.ImConstants;
import com.example.im.infra.executor.send.AbstractBaseSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.executor.send.dto.ScopeOfSendingEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;/*** @author PC* 消息队列执行*/
@Component
public class StreamSendExecutor extends AbstractBaseSendExecutor {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private final StreamBridge streamBridge;@Autowiredpublic StreamSendExecutor(StreamBridge streamBridge) {this.streamBridge = streamBridge;}@Overridepublic String getCommunicationType() {return ImConstants.CommunicationType.STREAM;}@Overridepublic void sendToUser(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.USER);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}@Overridepublic void sendToAll(String sendUserName, String message) {MessageInfo messageInfo = new MessageInfo();messageInfo.setSendUserName(sendUserName);messageInfo.setMessage(message);messageInfo.setScopeOfSending(ScopeOfSendingEnum.ALL);logger.debug("send to user stream websocket, topic is " + "TEST_TOPIC");streamBridge.send("TEST_TOPIC", messageInfo);}
}
  • StreamMessageListener

kafka消费者

package com.example.im.infra.executor.send.stream;import com.example.im.infra.executor.send.DefaultSendExecutor;
import com.example.im.infra.executor.send.dto.MessageInfo;
import com.example.im.infra.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;
import java.util.function.Function;/*** @author PC* 消息队列监听*/
@Component
public class StreamMessageListener {private final static Logger logger = LoggerFactory.getLogger(StreamSendExecutor.class);private DefaultSendExecutor defaultSendExecutor;@Autowiredpublic void setDefaultSendExecutor(DefaultSendExecutor defaultSendExecutor) {this.defaultSendExecutor = defaultSendExecutor;}@Beanpublic Function<Flux<Message<byte[]>>, Mono<Void>> listener() {return messageInfoFlux -> messageInfoFlux.map(message -> {String messageJson = new String(message.getPayload(), StandardCharsets.UTF_8);MessageInfo messageInfo = JsonUtils.toObjectByTypeReference(messageJson, new TypeReference<MessageInfo>() {});switch (messageInfo.getScopeOfSending()) {case USER:defaultSendExecutor.sendToUser(messageInfo.getSendUserName(), messageInfo.getMessage());break;case ALL:defaultSendExecutor.sendToAll(messageInfo.getSendUserName(), messageInfo.getMessage());break;default://一般来说不会出现该情况,除非用户覆盖了ScopeOfSending,后续可以开个扩展发送范围的口子logger.warn("invalid sending range:" + messageInfo.getScopeOfSending().getScopeCode());break;}return messageInfo;}).then();}
}

测试

test2向test1发送消息,成功接收

直接在消息队列中发送消息,test1也接收到了消息

参考资料

[1].SpringCloudStream中文文档

[2].im项目

http://www.lryc.cn/news/462641.html

相关文章:

  • 建造者模式和工厂模式的区别
  • GEE数据集——ERA5-陆地每日汇总--ECMWF气候再分析数据集
  • Spring Boot 中的 @RequestMapping 和 Spring 中的 @RequestMapping 有什么区别?
  • PROFINET开发或EtherNet/IP开发嵌入式归一板有用于工业称重秤
  • 【Kafka】Kafka源码解析之producer过程解读
  • 深度学习笔记20_数据增强
  • 模板变量与php变量对比做判断
  • C语言 | Leetcode C语言题解之第485题最大连续1的个数
  • C语言复习概要(六)
  • PyQt 入门教程(2)搭建开发环境
  • Flink Kubernetes Operator
  • 【最新华为OD机试E卷-支持在线评测】字符统计及重排(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)
  • springboot使用GDAL获取tif文件的缩略图并转为base64
  • Pytorch——pip下载安装pytorch慢的解决办法
  • uniapp微信小程序调用百度OCR
  • Vue3+TS项目---实用的复杂类型定义总结
  • 尚硅谷rabbitmq2024 工作模式路由篇 第11节 答疑
  • HTTP vs WebSocket
  • R语言医学数据分析实践-数据读写
  • JavaWeb环境下Spring Boot在线考试系统的优化策略
  • ETL技术在金蝶云星空与旺店通WMS集成中的应用
  • 【力扣热题100】3194. 最小元素和最大元素的最小平均值【Java】
  • 机器学习拟合过程
  • 如何快速部署一套智能化openGauss测试环境
  • 【设计模式】深入理解Python中的原型设计模式
  • Django CORS配置方案
  • 2024年开放式耳机哪个牌子好?推荐最好的顶级开放式耳机品牌
  • 零基础读懂Stable Diffusion!
  • Hash Join 和 Index Join工作原理和性能差异
  • Apifox简介及使用