需求:后台编辑推送消息,前台实时接收消息
下面是js实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| $(function() { bdipOnline(); });
var bdipWebSocket; var bdipReconnectTime = 5000; function bdipOnline() { if (window.WebSocket) { var protocol = publicJS.protocol == "https"? "wss": "ws"; try{ bdipWebSocket = new WebSocket(encodeURI(protocol + '://' + publicJS.host +'/ws')); }catch (err){ console.log("online链接websocket失败"); }
if(bdipWebSocket){
bdipWebSocket.onopen = function() { console.log("bdipWebSocket链接成功... bdipReconnectTime :" + bdipReconnectTime) setTimeout(bdipOnline, bdipReconnectTime); };
bdipWebSocket.onerror = function() { console.log("bdipWebSocket发生错误... bdipReconnectTime :" + bdipReconnectTime) setTimeout(bdipOnline, bdipReconnectTime); bdipReconnectTime += 5000; }; bdipWebSocket.onclose = function() { console.log("bdipWebSocket断开链接... bdipReconnectTime :" + bdipReconnectTime) setTimeout(bdipOnline, bdipReconnectTime); bdipReconnectTime += 5000; }; bdipWebSocket.onmessage = function(message) { console.log("接收到消息,消息内容为:" + message); var data = JSON.parse(message.data); if (data.type == 'logMessage') { notificate(data.data); } }; } } }
function notificate(_message) { var msgContent = JSON.parse(_message.content) var title = msgContent.title; if(title == null || typeof(title) == 'undefined' || title == '') { title = "通知中心"; } var content = msgContent.content; if(content.length > 140) { content = content.substr(0, 140); } $("#notification-title").html(title); $("#notification-content").html(content); $(".notification-panel").slideDown(1000); setTimeout(function(){ $(".notification-panel").slideUp(1000); }, 5000); }
|
下面是后台代码,先说一下实现思路:
1.在后台管理系统中编辑待推送数据并完成推送消息操作;
2.spring websocket 检测到用户登录,按某种规则将用户信息保存到redis;
3.定时轮训查看待推送信息,当检测到用户在线时进行推送,更新数据库为已推送;
其中使用到了使用redis(订阅和发布)功能进行数据推送,下面上代码:
spring websocket handler 监听用户登录的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package com.bim.bdip.cloud.home.web;
import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.bim.bdip.cloud.home.constant.BizServiceDefine; import com.bim.bdip.cloud.home.constant.ConstantDefine; import com.bim.bdip.cloud.home.core.base.Parameter; import com.bim.bdip.cloud.home.domain.api.ApiResultEntity; import com.bim.bdip.cloud.home.model.BimMessageCenter; import com.bim.bdip.cloud.home.model.BimUser; import com.bim.bdip.cloud.home.provider.IBizProvider; import org.apache.ibatis.session.RowBounds; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import java.io.IOException; import java.util.List; import java.util.Map;
public class WebSocketHandler extends TextWebSocketHandler { private static final Logger logger = LogManager.getLogger(WebSocketHandler.class); @Autowired private RedissonClient redissonClient;
@Autowired private IBizProvider bizProvider;
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
}
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session); Map<String, Object> attributes = session.getAttributes();
BimUser user = (BimUser) attributes.get(ConstantDefine.LOGIN_SESSION_USER);
if (user == null) { session.close(); } else { System.out.println("检测到用户登录:" + user.getId() + "当前线程ID为:" + Thread.currentThread().getId()); Long userId = user.getId(); String topic = String.format("notification:%d", userId); RTopic<BimMessageCenter> rtopic = redissonClient.getTopic(topic); rtopic.removeAllListeners();
String listenerKey = String.format("notification:%d:listeners", user.getId()); redissonClient.getSet(listenerKey).delete(); Integer listenerId = rtopic.addListener(new MessageListener<BimMessageCenter>() { @Override public void onMessage(String channel, BimMessageCenter msg) { System.out.println("向用户推送消息:" + msg); JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "logMessage"); jsonObject.put("data", msg); TextMessage textMessage = new TextMessage(jsonObject.toJSONString()); try { session.sendMessage(textMessage); } catch (IOException e) { logger.error("Websocket Error.", e); } } });
redissonClient.getSet(listenerKey).add(listenerId); attributes.put(ConstantDefine.LISTENER_ID, listenerId); } }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { Map<String, Object> attributes = session.getAttributes();
BimUser user = (BimUser) attributes.get(ConstantDefine.LOGIN_SESSION_USER); Integer listenerId = (Integer) attributes.get(ConstantDefine.LISTENER_ID); if (user != null) { System.out.println("用户退出:" + user.getId()); super.afterConnectionClosed(session, status); String topic = String.format("notification:%d", user.getId()); String listenerKey = String.format("notification:%d:listeners", user.getId()); redissonClient.getSet(listenerKey).remove(listenerId); RTopic<BimMessageCenter> rtopic = redissonClient.getTopic(topic); rtopic.removeListener(listenerId); } }
}
|
下面是定时任务,定时任务轮训待推送数据,如果检测到用户登录(通过redis),就进行数据推送(通过redis topic):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.bim.bdip.cloud.home.scheduled;
import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.bim.bdip.cloud.home.model.BimMessageCenter; import com.bim.bdip.cloud.home.service.IBimMessageCenterService; import org.apache.ibatis.session.RowBounds; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service;
import java.util.Date; import java.util.List;
@Service @EnableScheduling public class NotificationTask {
@Autowired private RedissonClient redissonClient;
@Autowired private IBimMessageCenterService messageCenterService;
@Scheduled(cron="1 * * * * ?") public void scanNotifications() {EntityWrapper<BimMessageCenter> wrapper = new EntityWrapper<>(); RowBounds rowBounds = new RowBounds(0, 10000000); wrapper.where("read_status = {0}", 0).and("notification_status = {0}", 0).and("message_type = {0}", "logMessage");
List<BimMessageCenter> messageList = messageCenterService.selectPage(rowBounds, wrapper); System.out.println("轮训检测数据库数据" + messageList.size() + "当前线程ID为:" + Thread.currentThread().getId() + "当前时间为" + new Date().getTime()); for(BimMessageCenter message : messageList) { String listenerKey = String.format("notification:%d:listeners", message.getReceiveId()); if(redissonClient.getSet(listenerKey).size() > 0) { System.out.println("检测到用户" + message.getReceiveId() + "在线,推送消息"); String topic = String.format("notification:%d", message.getReceiveId()); RTopic<BimMessageCenter> rTopic = redissonClient.getTopic(topic); rTopic.publish(message); if(message.getNotificationStatus() != 2){ message.setNotificationStatus(2); this.messageCenterService.update(message); } } } System.out.println("轮训结束,当前时间为:" + new Date().getTime()); }
}
|