1
0
mirror of https://github.com/chatopera/cosin.git synced 2025-06-16 18:30:03 +08:00

Merge pull request #1030 from DevDengChao/feat/mq-destination

feat: 使用前后缀来避免不同 profile 共用 ActiveMQ 实例引发的消息路由错乱问题
This commit is contained in:
lecjy 2024-12-07 10:40:30 +08:00 committed by GitHub
commit 621f4a0434
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 35 additions and 29 deletions

View File

@ -54,7 +54,7 @@ public class AgentAuditSubscription {
*
* @param msg
*/
@JmsListener(destination = Constants.AUDIT_AGENT_MESSAGE, containerFactory = "jmsListenerContainerTopic")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.AUDIT_AGENT_MESSAGE + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
public void onMessage(final String msg) {
logger.info("[onMessage] payload {}", msg);
try {

View File

@ -31,7 +31,7 @@ public class AgentSessionSubscription {
*
* @param msg
*/
@JmsListener(destination = Constants.MQ_TOPIC_WEB_SESSION_SSO, containerFactory = "jmsListenerContainerTopic")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.MQ_TOPIC_WEB_SESSION_SSO + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
public void onMessage(final String msg) {
logger.info("[onMessage] payload {}", msg);
try {

View File

@ -48,7 +48,7 @@ public class AgentSubscription {
brokerPublisher.send(Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT, j.toString(), true);
}
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT, containerFactory = "jmsListenerContainerTopic")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
public void onMessage(final String payload) {
logger.info("[onMessage] payload {}", payload);
JsonParser parser = new JsonParser();

View File

@ -1,14 +1,14 @@
/*
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
* <https://www.chatopera.com>, Licensed under the Chunsong Public
/*
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
* <https://www.chatopera.com>, Licensed under the Chunsong Public
* License, Version 1.0 (the "License"), https://docs.cskefu.com/licenses/v1.html
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Copyright (C) 2019-2022 Chatopera Inc, <https://www.chatopera.com>,
* Licensed under the Apache License, Version 2.0,
* Copyright (C) 2019-2022 Chatopera Inc, <https://www.chatopera.com>,
* Licensed under the Apache License, Version 2.0,
* http://www.apache.org/licenses/LICENSE-2.0
*/
package com.cskefu.cc.activemq;
@ -43,7 +43,7 @@ public class BlackListEventSubscription {
*
* @param payload
*/
@JmsListener(destination = Constants.WEBIM_SOCKETIO_ONLINE_USER_BLACKLIST, containerFactory = "jmsListenerContainerQueue")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.WEBIM_SOCKETIO_ONLINE_USER_BLACKLIST + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
public void onMessage(final String payload) {
logger.info("[onMessage] payload {}", payload);

View File

@ -1,13 +1,13 @@
/*
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
* <https://www.chatopera.com>, Licensed under the Chunsong Public
/*
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
* <https://www.chatopera.com>, Licensed under the Chunsong Public
* License, Version 1.0 (the "License"), https://docs.cskefu.com/licenses/v1.html
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Copyright (C) 2019-2022 Chatopera Inc, All rights reserved.
* Copyright (C) 2019-2022 Chatopera Inc, All rights reserved.
* <https://www.chatopera.com>
*/
@ -20,10 +20,8 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.stereotype.Component;
import java.util.Map;
@ -35,6 +33,10 @@ public class BrokerPublisher {
@Autowired
private JmsTemplate jmsTemplate;
@Value("${cskefu.activemq.destination.prefix}")
private String prefix;
@Value("${cskefu.activemq.destination.suffix}")
private String suffix;
@PostConstruct
public void setup() {
@ -48,17 +50,18 @@ public class BrokerPublisher {
* @param payload
* @param delay available by delayed seconds
*/
public void send(final String destination, final String payload, final boolean isTopic, final int delay) {
public void send(String destination, final String payload, final boolean isTopic, final int delay) {
destination = prefix + destination + suffix;
try {
if (isTopic) {
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), payload, m -> {
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000 * delay);
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000L * delay);
return m;
});
} else {
// 默认为Queue
jmsTemplate.convertAndSend(destination, payload, m -> {
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000 * delay);
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000L * delay);
return m;
});
}
@ -73,7 +76,8 @@ public class BrokerPublisher {
* @param payload
* @param isTopic
*/
public void send(final String destination, final String payload, boolean isTopic) {
public void send(String destination, final String payload, boolean isTopic) {
destination = prefix + destination + suffix;
try {
if (isTopic) {
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), payload);

View File

@ -17,6 +17,7 @@ import com.cskefu.cc.socketio.client.NettyClients;
import com.cskefu.cc.util.SerializeUtil;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -24,8 +25,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
/**
* IM OnlineUser
*/
@ -57,7 +56,7 @@ public class OnlineUserSubscription {
}
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_TOPIC_ONLINEUSER, containerFactory = "jmsListenerContainerTopic")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_TOPIC_ONLINEUSER + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
public void onMessage(final String payload){
logger.info("[onMessage] payload {}", payload);
JsonParser parser = new JsonParser();

View File

@ -22,6 +22,7 @@ import com.cskefu.cc.model.AgentStatus;
import com.cskefu.cc.persistence.repository.AgentStatusRepository;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,7 +30,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import java.util.Date;
/**
@ -60,7 +60,7 @@ public class SocketioConnEventSubscription {
logger.info("ActiveMQ Subscription is setup successfully.");
}
@JmsListener(destination = Constants.WEBIM_SOCKETIO_AGENT_DISCONNECT, containerFactory = "jmsListenerContainerQueue")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.WEBIM_SOCKETIO_AGENT_DISCONNECT + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
public void onMessage(final String payload) {
logger.info("[onMessage] payload {}", payload);

View File

@ -80,7 +80,7 @@ public class ChatbotEventSubscription {
*
* @param payload
*/
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_QUEUE_CHATBOT, containerFactory = "jmsListenerContainerQueue")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_QUEUE_CHATBOT + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
public void onMessage(final String payload) {
ChatMessage message = SerializeUtil.deserialize(payload);
try {
@ -238,7 +238,7 @@ public class ChatbotEventSubscription {
for (int i = 0; i < faqReplies.length(); i++) {
JSONObject sugg = new JSONObject();
JSONObject faqReply = faqReplies.getJSONObject(i);
sugg.put("label", Integer.toString(i + 1) + ". " + faqReply.getString("post"));
sugg.put("label", i + 1 + ". " + faqReply.getString("post"));
sugg.put("text", faqReply.getString("post"));
sugg.put("type", "qlist");
suggs.put(sugg);

View File

@ -43,7 +43,7 @@ public class MessengerEventSubscription {
@Autowired
private MessengerMessageProxy messengerMessageProxy;
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_QUEUE_FACEBOOK_OTN, containerFactory = "jmsListenerContainerQueue")
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_QUEUE_FACEBOOK_OTN + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
public void onPublish(final String jsonStr) {
JSONObject payload = JSONObject.parseObject(jsonStr);
String otnId = payload.getString("otnId");

View File

@ -109,6 +109,9 @@ spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
# 使用前后缀来避免不同 profile 共用 ActiveMQ 实例引发的消息路由错乱问题
cskefu.activemq.destination.prefix=
cskefu.activemq.destination.suffix=
##############################################
# Actuator
@ -179,4 +182,4 @@ telemetry.channel.webim.visitor=on
extras.login.banner=off
extras.login.chatbox=off
extras.auth.super-admin.pass=
extras.log.request=off
extras.log.request=off