Browse Source

连接的计时器

luoynagwei 9 months ago
parent
commit
7409ef643a

+ 4 - 12
api-websocket/src/main/java/com/sikey/wa04/api/websocket/client/handler/WebSocketHandler.java

@@ -1,26 +1,18 @@
 package com.sikey.wa04.api.websocket.client.handler;
 
-import com.sikey.wa04.business.application.service.HeartbeatService;
+import com.sikey.wa04.business.application.service.websocket.ConnectionHeartbeatService;
 import com.sikey.wa04.business.application.service.websocket.ConnectionManagerService;
-import com.sikey.wa04.business.application.usecase.HeartbeatUsecase;
-import com.sikey.wa04.business.application.usecase.OfflineMessageLoadingUsecase;
-import com.sikey.wa04.business.application.usecase.WebSocketConnectionUsecase;
-import com.sikey.wa04.business.domain.entity.Session;
 import com.sikey.wa04.common.constants.TraceConstant;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.MDC;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.web.socket.CloseStatus;
 import org.springframework.web.socket.PongMessage;
 import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
-import org.springframework.web.socket.adapter.standard.StandardWebSocketSession;
 import org.springframework.web.socket.handler.TextWebSocketHandler;
 
-import java.util.UUID;
-
 /**
  * WebSocket处理器
  *
@@ -31,7 +23,7 @@ import java.util.UUID;
 public class WebSocketHandler extends TextWebSocketHandler {
 
     @Resource
-    private HeartbeatService heartbeatService;
+    private ConnectionHeartbeatService connectionHeartbeatService;
 
     @Resource
     private ConnectionManagerService connectionManagerService;
@@ -49,7 +41,7 @@ public class WebSocketHandler extends TextWebSocketHandler {
     @Override
     protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage textMessage) {
         MDC.put(TraceConstant.REQUEST_ID, getSecWebSocketKey(webSocketSession));
-        heartbeatService.handleCustomizedMessage();
+        connectionHeartbeatService.handleCustomizedMessage(webSocketSession);
     }
 
     /**
@@ -87,6 +79,6 @@ public class WebSocketHandler extends TextWebSocketHandler {
      */
     @Override
     protected void handlePongMessage(WebSocketSession webSocketSession, PongMessage pongMessage) throws Exception {
-        heartbeatService.handlePongMessage(webSocketSession);
+        connectionHeartbeatService.handlePongMessage(webSocketSession);
     }
 }

+ 24 - 0
api-websocket/src/main/java/com/sikey/wa04/api/websocket/scheduler/KeepItAliveScheduler.java

@@ -0,0 +1,24 @@
+package com.sikey.wa04.api.websocket.scheduler;
+
+import com.sikey.wa04.business.application.service.websocket.ConnectionManagerService;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class KeepItAliveScheduler {
+
+    @Resource
+    private ConnectionManagerService connectionManagerService;
+
+
+    /**
+     * 检查Session上次活跃时间 <b>10秒检查一次</b>
+     */
+    @Scheduled(cron = "*/10 * * * * *")
+    public void run() {
+        connectionManagerService.removeDeadConnection();
+    }
+}

+ 6 - 0
business-application/src/main/java/com/sikey/wa04/business/application/converter/ConnectionConverter.java

@@ -11,6 +11,7 @@ import lombok.experimental.UtilityClass;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.socket.WebSocketSession;
 
+import java.time.Instant;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
@@ -48,7 +49,12 @@ public class ConnectionConverter {
         return new Connection()
                 .setUserId(id)
                 .setId(new ConnectionId(webSocketKey))
+                .setLastHeartbeat(Instant.now())
                 .setLocale(locale);
     }
 
+    public ConnectionId toConnectionId(WebSocketSession session) {
+        String webSocketKey = session.getHandshakeHeaders().get("Sec-WebSocket-Key").getFirst();
+        return new ConnectionId(webSocketKey);
+    }
 }

+ 0 - 30
business-application/src/main/java/com/sikey/wa04/business/application/service/HeartbeatService.java

@@ -1,30 +0,0 @@
-package com.sikey.wa04.business.application.service;
-
-import com.sikey.wa04.business.domain.adapter.SessionSendMessageAdapter;
-import jakarta.annotation.Resource;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-import org.springframework.web.socket.PingMessage;
-import org.springframework.web.socket.WebSocketSession;
-
-@Slf4j
-@Service
-public class HeartbeatService {
-
-    @Resource
-    private SessionSendMessageAdapter sessionSendMessageAdapter;
-
-    public void handleCustomizedMessage() {
-        sessionSendMessageAdapter.send(null);
-    }
-
-    /**
-     * 处理 Ping 消息
-     *
-     * @param session WebSocketSession
-     * @throws Exception 异常
-     */
-    public void handlePongMessage(WebSocketSession session) throws Exception {
-        session.sendMessage(new PingMessage());
-    }
-}

+ 56 - 0
business-application/src/main/java/com/sikey/wa04/business/application/service/websocket/ConnectionHeartbeatService.java

@@ -0,0 +1,56 @@
+package com.sikey.wa04.business.application.service.websocket;
+
+import com.sikey.wa04.business.application.converter.ConnectionConverter;
+import com.sikey.wa04.business.domain.adapter.SessionSendMessageAdapter;
+import com.sikey.wa04.business.domain.entity.websocket.Connection;
+import com.sikey.wa04.business.infrastructure.adapter.CacheConnectionStateManager;
+import com.sikey.wa04.business.infrastructure.adapter.RedisConnectionStateManager;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.PingMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+
+/**
+ * 心跳
+ *
+ * @author luoyangwei
+ */
+@Slf4j
+@Service
+public class ConnectionHeartbeatService {
+
+    @Resource
+    private CacheConnectionStateManager cacheConnectionStateManager;
+
+    @Resource
+    private RedisConnectionStateManager redisConnectionStateManager;
+
+    /**
+     * 自定义消息处理
+     *
+     * @param session 会话
+     */
+    public void handleCustomizedMessage(WebSocketSession session) {
+        log.info("Refresh heartbeat after receiving message: {}", session);
+
+        Connection connection = ConnectionConverter.to(session);
+        cacheConnectionStateManager.heartbeat(connection);
+        redisConnectionStateManager.heartbeat(connection);
+    }
+
+
+    /**
+     * 处理Ping消息
+     *
+     * @param session 会话
+     * @throws IOException 异常
+     */
+    public void handlePongMessage(WebSocketSession session) throws IOException {
+        log.debug("PongMessage: {}", session);
+        session.sendMessage(new PingMessage());
+    }
+
+}

+ 30 - 1
business-application/src/main/java/com/sikey/wa04/business/application/service/websocket/ConnectionManagerService.java

@@ -2,14 +2,20 @@ package com.sikey.wa04.business.application.service.websocket;
 
 import com.sikey.wa04.business.application.converter.ConnectionConverter;
 import com.sikey.wa04.business.domain.entity.websocket.Connection;
+import com.sikey.wa04.business.domain.entity.websocket.ConnectionId;
 import com.sikey.wa04.business.infrastructure.adapter.CacheConnectionStateManager;
 import com.sikey.wa04.business.infrastructure.adapter.RedisConnectionStateManager;
+import com.sikey.wa04.business.infrastructure.configuration.WebSocketConfig;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.web.socket.CloseStatus;
 import org.springframework.web.socket.WebSocketSession;
 
+import java.time.Duration;
+import java.util.List;
+
 /**
  * 连接管理
  *
@@ -25,6 +31,13 @@ public class ConnectionManagerService {
     @Resource
     private CacheConnectionStateManager cacheConnectionStateManager;
 
+    private WebSocketConfig webSocketConfig;
+
+    @Autowired
+    public void setWebSocketConfig(WebSocketConfig webSocketConfig) {
+        this.webSocketConfig = webSocketConfig;
+    }
+
     /**
      * 连接建立
      */
@@ -42,8 +55,24 @@ public class ConnectionManagerService {
     public void connectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
         log.info("Connection closed code: {}", closeStatus.getCode());
 
-        Connection connection = ConnectionConverter.to(webSocketSession);
+        ConnectionId connectionId = ConnectionConverter.toConnectionId(webSocketSession);
+        Connection connection = cacheConnectionStateManager.getConnection(connectionId);
         cacheConnectionStateManager.offline(connection);
         redisConnectionStateManager.offline(connection);
     }
+
+    /**
+     * 移除死连接
+     */
+    public void removeDeadConnection() {
+        log.info("Remove dead connection");
+
+        List<Connection> connections = cacheConnectionStateManager.getConnections();
+        for (Connection connection : connections) {
+            if (connection.isDead(Duration.ofSeconds(webSocketConfig.getHeartbeatTimeout()))) {
+                cacheConnectionStateManager.offline(connection);
+                redisConnectionStateManager.offline(connection);
+            }
+        }
+    }
 }

+ 19 - 1
business-domain/src/main/java/com/sikey/wa04/business/domain/entity/websocket/Connection.java

@@ -4,6 +4,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.experimental.Accessors;
 
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Locale;
 
 /**
@@ -21,10 +23,26 @@ public class Connection {
      * 连接用户ID
      */
     private String userId;
-    
+
     /**
      * 语言环境
      * 默认: {@link com.sikey.wa04.common.constants.LocaleConstant#DEFAULT_LANGUAGE}
      */
     private Locale locale;
+
+    /**
+     * 上次心跳时间
+     */
+    private Instant lastHeartbeat;
+
+
+    /**
+     * 是否已经死亡
+     *
+     * @param heartbeatTimeout 心跳超时时间
+     * @return 是否已经死亡
+     */
+    synchronized public boolean isDead(Duration heartbeatTimeout) {
+        return lastHeartbeat.plus(heartbeatTimeout).isBefore(Instant.now());
+    }
 }

+ 24 - 1
business-infrastructure/src/main/java/com/sikey/wa04/business/infrastructure/adapter/CacheConnectionStateManager.java

@@ -5,6 +5,8 @@ import com.sikey.wa04.business.domain.entity.websocket.ConnectionId;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
+import java.time.Instant;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -26,10 +28,31 @@ public class CacheConnectionStateManager implements ConnectionStateManager {
 
     @Override
     public void offline(Connection connection) {
-        connections.remove(connection.getId(), connection);
+        connections.remove(connection.getId());
     }
 
     @Override
     public void heartbeat(Connection connection) {
+        connections.put(connection.getId(),
+                connections.get(connection.getId()).setLastHeartbeat(Instant.now()));
+    }
+
+    /**
+     * 获取所有连接
+     *
+     * @return 连接列表
+     */
+    public List<Connection> getConnections() {
+        return List.copyOf(connections.values());
+    }
+
+    /**
+     * 获取连接
+     *
+     * @param connectionId 连接ID
+     * @return 连接
+     */
+    public Connection getConnection(ConnectionId connectionId) {
+        return connections.get(connectionId);
     }
 }

+ 6 - 6
business-infrastructure/src/main/java/com/sikey/wa04/business/infrastructure/adapter/ConnectionStateManager.java

@@ -5,20 +5,20 @@ import com.sikey.wa04.business.domain.entity.websocket.Connection;
 public interface ConnectionStateManager {
 
     /**
-     *
-     * @param connection
+     * 上线
+     * @param connection Connection
      */
     void online(Connection connection);
 
     /**
-     *
-     * @param connection
+     * 下线
+     * @param connection Connection
      */
     void offline(Connection connection);
 
     /**
-     *
-     * @param connection
+     * 心跳
+     * @param connection Connection
      */
     void heartbeat(Connection connection);
 }