当前位置: 首页 > news >正文

netty-socketio 集群随记

实现netty-socketio集群的方式

代码实例

    @PostConstructpublic void subscribe() {pubSubStore.subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {@Overridepublic void onMessage(DispatchMessage message) {log.debug("subscribe: {}" ,message);Collection<SocketIOClient> clients = null;String room = message.getRoom();String namespace = message.getNamespace();Packet packet = message.getPacket();if (!ObjectUtils.isEmpty(namespace)&&!ObjectUtils.isEmpty(room)){SocketIONamespace socketIONamespace = socketIOServer.getNamespace(namespace);if (socketIONamespace != null){clients = socketIONamespace.getRoomOperations(room).getClients();}}else{clients = socketIOServer.getBroadcastOperations().getClients();}if(!CollectionUtils.isEmpty(clients)){clients.parallelStream().forEach(ioClient -> {ioClient.sendEvent(Event.Event,packet.getData());});}}}, DispatchMessage.class);}
}
@Component
@Slf4j
public class ClientUtil {@Resourceprivate SocketIOServer socketIOServer;public void sendEvent(SocketIOClient client, MessageModel model) {log.debug("sendEvent--------------------------:{}", model);client.sendEvent(Event.Event, model);}public void sendNamespaceRoomEventExcluded(SocketIOClient client, MessageModel model) {log.debug("sendNamespaceRoomEventExcluded:{}", model);socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, client, model);}public void sendNamespaceRoomEvent(MessageModel model) {log.debug("sendNamespaceRoomEvent:{}", model);socketIOServer.getNamespace(model.getNamespace()).getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);}public void sendNamespaceEvent(MessageModel model) {log.debug("sendNamespaceRoomEvent:{}", model);socketIOServer.getNamespace(model.getNamespace()).getAllClients().parallelStream().forEach(client -> {client.sendEvent(Event.Event, model);});}public void sendRoomEvent(MessageModel model) {log.debug("sendNamespaceRoomEvent:{}", model);socketIOServer.getRoomOperations(model.getRoomId()).sendEvent(Event.Event, model);}
}

源码

查看源码可发现sendEvent方法最终还是调用了dispatch方法,由此可见我们只需要实现订阅就行。

/*** broadcast interface**/
public interface BroadcastOperations extends ClientOperations {Collection<SocketIOClient> getClients();<T> void send(Packet packet, BroadcastAckCallback<T> ackCallback);void sendEvent(String name, SocketIOClient excludedClient, Object... data);void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data);<T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback);<T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback);<T> void sendEvent(String name, Object data, Predicate<SocketIOClient> excludePredicate, BroadcastAckCallback<T> ackCallback);}
    @Overridepublic void sendEvent(String name, Predicate<SocketIOClient> excludePredicate, Object... data) {Packet packet = new Packet(PacketType.MESSAGE, EngineIOVersion.UNKNOWN);packet.setSubType(PacketType.EVENT);packet.setName(name);packet.setData(Arrays.asList(data));for (SocketIOClient client : clients) {packet.setEngineIOVersion(client.getEngineIOVersion());if (excludePredicate.test(client)) {continue;}client.send(packet);}dispatch(packet);}
    private void dispatch(Packet packet) {this.storeFactory.pubSubStore().publish(PubSubType.DISPATCH,new DispatchMessage(this.room, packet, this.namespace));}

http://www.lryc.cn/news/351689.html

相关文章:

  • 查看目录或文件的磁盘使用情况
  • 如何选择合适的自动化框架
  • Java面试进阶指南:高级知识点问答精粹(二)
  • thinkphp 使用模型实现多表连接查询
  • LeetCode674:最长连续递增序列
  • Java技术精粹:高级面试问题与解答指南(一)
  • 数据可视化技术头歌测试合集
  • 【linux-IMX6ULL-字符设备驱动简单框架实验】
  • 3D模型旋转显示不全怎么办---模大狮模型网
  • DLRover:蚂蚁集团开源的AI训练革命
  • ubuntu_概念
  • Oracle递归查询笔记
  • FaceFusion源码框架解读
  • React项目知识积累(三)
  • 前端实现打印功能
  • 创建型模式之工厂模式
  • 「动态规划」按摩师
  • 小程序-滚动触底-页面列表数据无限加载
  • 监控上网的软件有哪些?含泪推荐的电脑监控软件
  • linux系统防火墙开放端口命令
  • WebGL渲染引擎优化方向——渲染帧率的优化
  • 【文献阅读】ESG评级分化和企业绿色创新
  • 2024-5-6-从0到1手写配置中心Config之实现配置中心客户端
  • 【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(十一)
  • Amesim示例篇-案例1:空间中的铝块散热
  • 深度神经网络——什么是自动编码器?
  • 初见flyway
  • 9.6 Go语言入门(数组、切片和指针)
  • Web面试题(一)
  • 【Crypto】一眼就解密