WebSocket消息推送
创建WebSocket工具类
package org.jmis.riskassess.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@ServerEndpoint(value = "/message-service")
public class WebSocketUtil {private static final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>();private static final Logger logger = LoggerFactory.getLogger(WebSocketUtil.class);public static void pushMessage(String userId, String message) {Session session = sessions.get(userId);if (session != null && session.isOpen()) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {logger.error("Failed to send message to userId: " + userId, e);}} else {// 会话失效,从会话集合中移除sessions.remove(userId, session);
// logger.warn("Session is invalid for userId: " + userId + ", removing from sessions");}}@OnOpenpublic void onOpen(Session session, EndpointConfig config) {Map<String, List<String>> queryParams = session.getRequestParameterMap();String userId = queryParams.get("userId").get(0);sessions.put(userId, session);logger.info("WebSocket opened: " + session.getId() + ", userId: " + userId);}@OnClosepublic void onClose(Session session) {String closedSessionId = session.getId();sessions.entrySet().removeIf(entry -> entry.getValue().getId().equals(closedSessionId));logger.info("WebSocket closed: " + closedSessionId);}@OnMessagepublic void onMessage(String message, Session session) {String userId = (String) session.getUserProperties().get("userId");if (userId == null) {session.getUserProperties().put("userId", message);logger.info("User ID saved: " + message);}}public static int getSessionCount() {return sessions.size();}public static int getOpenConnectionCount() {int openConnectionCount = 0;for (Session session : sessions.values()) {if (session.isOpen()) {openConnectionCount++;}}return openConnectionCount;}public static List<Session> getOpenConnections() {List<Session> openConnections = new ArrayList<>();for (Session session : sessions.values()) {if (session.isOpen()) {openConnections.add(session);}}return openConnections;}
}
创建WebSocket配置文件
package org.jmis.riskassess.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
业务逻辑
package org.jmis.riskassess.safeSystemDataTask;import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jmis.riskassess.config.WebSocketUtil;
import org.jmis.riskassess.entity.Message;
import org.jmis.riskassess.entity.MessageUser;
import org.jmis.riskassess.pojo.MessageInfo;
import org.jmis.riskassess.pojo.MineFoundation;
import org.jmis.riskassess.service.IMessageService;
import org.jmis.riskassess.service.IMessageUserService;
import org.jmis.riskassess.vo.AlarmRealtimeVO;
import org.jmis.riskassess.vo.DzAlarmAcceptVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springjmis.core.tool.utils.Func;
import org.springjmis.core.tool.utils.ObjectUtil;import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Component
public class MessagePushTask {@Autowiredprivate WebSocketUtil webSocketUtil;@Autowiredprivate IMessageUserService messageUserService;@Autowiredprivate IMessageService messageService;@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void pushUnreadMessages() {// 查询未读消息的用户以及消息内容List<Map<String,Object>> userIds = messageUserService.findUsersWithUnreadMessages();// 推送未读消息给相应的用户for (Map<String, Object> user : userIds) {Long idLong = (Long) user.get("id");String id = String.valueOf(idLong);String userId = user.get("user_id").toString();Object isImportant= user.get("is_important");String messageName = (String) user.get("message_name");String details = (String) user.get("details");String messageIdLong = user.get("message_id").toString();String messageId = String.valueOf(messageIdLong);String type = (String) user.get("type");// 构造推送消息的内容JSONObject pushMessage = new JSONObject();pushMessage.put("messageId", messageId);pushMessage.put("messageName", messageName);pushMessage.put("details", details);pushMessage.put("userId", userId);pushMessage.put("type", type);pushMessage.put("isImportant", isImportant);pushMessage.put("id", id);// 将推送消息转换为JSON字符串String jsonMessage = pushMessage.toJSONString();// 推送消息给用户WebSocketUtil.pushMessage(userId, jsonMessage);}}//甲烷报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void alarmMessages() {// 查询未读消息的用户以及消息内容List<AlarmRealtimeVO> alarmRealtimeVOS = messageUserService.alarmMessages();// 推送未读消息给相应的用户for (AlarmRealtimeVO realtimeVO : alarmRealtimeVOS) {String id = realtimeVO.getAlarmid();String userId = realtimeVO.getUserId();String messageName = "超限报警";String details = "报警类型:"+realtimeVO.getAlarmType()+","+"测点类型:"+realtimeVO.getSensorname()+","+"报警地点:"+realtimeVO.getNodeplace()+","+"报警开始时间:"+realtimeVO.getStartTime()+","+"报警持续时长:"+realtimeVO.getTimeLong()+","+"报警最大值:"+realtimeVO.getMaxdata();saveMessage(id,messageName,details,"jkAlarm");saveMessageUser(id,userId);}}//矿井超员报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void mineOverAlarmMessages() {// 查询未读消息的用户以及消息内容List<MessageInfo> messageInfos = messageUserService.mineOverAlarmMessages();// 推送未读消息给相应的用户for (MessageInfo realtimeVO : messageInfos) {saveMessage(realtimeVO.getId(),realtimeVO.getMessageName(),realtimeVO.getDetails(),"ryAlarm");saveMessageUser(realtimeVO.getId(),realtimeVO.getUserId());}}//地震报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void dzAlarmMessages() {// 查询未读消息的用户以及消息内容List<MessageInfo> messageInfos = messageUserService.dzAlarmMessages();// 推送未读消息给相应的用户for (MessageInfo realtimeVO : messageInfos) {String id = realtimeVO.getId();List<String> list = Func.toStrList(realtimeVO.getUserId());String messageName = "地震报警";String details = realtimeVO.getDetails();saveMessage(id,messageName,details,"dzAlarm");for (String s : list) {saveMessageUser(id,s);}}}//天气报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void tqAlarmMessages() {// 查询未读消息的用户以及消息内容List<MessageInfo> tqAlarmMessages = messageUserService.tqAlarmMessages();// 存入message表和messageUser表for (MessageInfo realtimeVO : tqAlarmMessages) {String id = realtimeVO.getId();List<String> list = Func.toStrList(realtimeVO.getUserId());String messageName = realtimeVO.getMessageName();String details = realtimeVO.getDetails();saveMessage(id,messageName,details,"tqAlarm");for (String s : list) {saveMessageUser(id,s);}}}public void saveMessage(String id ,String messageName,String details,String type){List<String> list = messageService.messageIdList();if (!list.contains(id)){Message message=new Message();message.setMessageId(id);message.setMessageName(messageName);message.setType(type);message.setDetails(details);messageService.save(message);}}public void saveMessageUser(String id,String userId){List<MessageUser> messageUserServiceOne= messageUserService.getMessageUser(id,userId);if (CollectionUtil.isEmpty(messageUserServiceOne)) {MessageUser messageUser = new MessageUser();messageUser.setUserId(userId);messageUser.setMessageId(id);messageUserService.save(messageUser);}}}