|
@@ -0,0 +1,81 @@
|
|
|
+package com.sikey.wa04.api.websocket.client.handler;
|
|
|
+
|
|
|
+import com.sikey.wa04.business.application.service.MessagingSubscribeService;
|
|
|
+import com.sikey.wa04.business.domain.adapter.GroupMemberAdapter;
|
|
|
+import com.sikey.wa04.business.domain.adapter.MessageAdapter;
|
|
|
+import com.sikey.wa04.business.domain.adapter.MessageReceivedAdapter;
|
|
|
+import com.sikey.wa04.business.domain.entity.GroupId;
|
|
|
+import com.sikey.wa04.business.domain.entity.GroupMember;
|
|
|
+import com.sikey.wa04.business.domain.entity.message.Message;
|
|
|
+import com.sikey.wa04.business.domain.entity.message.MessageId;
|
|
|
+import com.sikey.wa04.business.domain.entity.message.MessageReceivedLog;
|
|
|
+import com.sikey.wa04.business.domain.entity.websocket.Ack;
|
|
|
+import com.sikey.wa04.business.infrastructure.adapter.RedisConnectionStateManager;
|
|
|
+import com.sikey.wa04.business.infrastructure.aop.MessagingHandlerTrace;
|
|
|
+import com.sikey.wa04.common.dto.message.BasicChatMessage;
|
|
|
+import com.sikey.wa04.common.dto.message.BasicNoticeMessage;
|
|
|
+import com.sikey.wa04.common.enums.MessageStatus;
|
|
|
+import jakarta.annotation.Resource;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.messaging.handler.annotation.Headers;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 收到 MQ 消息
|
|
|
+ *
|
|
|
+ * @author luoyangwei
|
|
|
+ * @date 2024年10月12日17:07:41
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+@RabbitListener(queues = "publish_queue")
|
|
|
+public class MessageQueueHandler {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private MessagingSubscribeService messagingSubscribeService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private RedisConnectionStateManager redisConnectionStateManager;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private MessageAdapter messageAdapter;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private MessageReceivedAdapter messageReceivedAdapter;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private GroupMemberAdapter groupMemberAdapter;
|
|
|
+
|
|
|
+
|
|
|
+ @RabbitHandler
|
|
|
+ @MessagingHandlerTrace
|
|
|
+ public void receiveChatMessage(BasicChatMessage chatMessage, @Headers Map<String, Object> headers) {
|
|
|
+ log.info("Received chat message: {}, {}", chatMessage, headers);
|
|
|
+
|
|
|
+ MessageId messageId = new MessageId(chatMessage.getMsgId());
|
|
|
+ Message message = messageAdapter.find(messageId);
|
|
|
+
|
|
|
+ List<MessageReceivedLog> receivedLogs = new ArrayList<>();
|
|
|
+ message.getReceiverId().stream().map(receiverId ->
|
|
|
+ new MessageReceivedLog().setReceiverId(receiverId).setStatus(MessageStatus.UNREAD)
|
|
|
+ ).forEach(receivedLogs::add);
|
|
|
+ message.setReceivedLogs(receivedLogs);
|
|
|
+
|
|
|
+ // 发送 Ack 消息
|
|
|
+ List<Ack> ack = messageReceivedAdapter.save(message, message.getReceivedLogs());
|
|
|
+
|
|
|
+ messagingSubscribeService.consume(message);
|
|
|
+ }
|
|
|
+
|
|
|
+ @RabbitHandler
|
|
|
+ @MessagingHandlerTrace
|
|
|
+ public void receiveNoticeMessage(BasicNoticeMessage message, @Headers Map<String, Object> headers) {
|
|
|
+ log.info("Received notice message: {}, {}", message, headers);
|
|
|
+ }
|
|
|
+}
|