mirror of
https://github.com/chatopera/cosin.git
synced 2025-06-16 18:30:03 +08:00
feat: 使用前后缀来避免不同 profile 共用 ActiveMQ 实例引发的消息路由错乱问题
Signed-off-by: DevDengChao <2325690622@qq.com>
This commit is contained in:
parent
bff8eef9fd
commit
8f7bf33d18
@ -54,7 +54,7 @@ public class AgentAuditSubscription {
|
|||||||
*
|
*
|
||||||
* @param msg
|
* @param msg
|
||||||
*/
|
*/
|
||||||
@JmsListener(destination = Constants.AUDIT_AGENT_MESSAGE, containerFactory = "jmsListenerContainerTopic")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.AUDIT_AGENT_MESSAGE + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
|
||||||
public void onMessage(final String msg) {
|
public void onMessage(final String msg) {
|
||||||
logger.info("[onMessage] payload {}", msg);
|
logger.info("[onMessage] payload {}", msg);
|
||||||
try {
|
try {
|
||||||
|
@ -31,7 +31,7 @@ public class AgentSessionSubscription {
|
|||||||
*
|
*
|
||||||
* @param msg
|
* @param msg
|
||||||
*/
|
*/
|
||||||
@JmsListener(destination = Constants.MQ_TOPIC_WEB_SESSION_SSO, containerFactory = "jmsListenerContainerTopic")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.MQ_TOPIC_WEB_SESSION_SSO + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
|
||||||
public void onMessage(final String msg) {
|
public void onMessage(final String msg) {
|
||||||
logger.info("[onMessage] payload {}", msg);
|
logger.info("[onMessage] payload {}", msg);
|
||||||
try {
|
try {
|
||||||
|
@ -48,7 +48,7 @@ public class AgentSubscription {
|
|||||||
brokerPublisher.send(Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT, j.toString(), true);
|
brokerPublisher.send(Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT, j.toString(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT, containerFactory = "jmsListenerContainerTopic")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_TOPIC_AGENT + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
|
||||||
public void onMessage(final String payload) {
|
public void onMessage(final String payload) {
|
||||||
logger.info("[onMessage] payload {}", payload);
|
logger.info("[onMessage] payload {}", payload);
|
||||||
JsonParser parser = new JsonParser();
|
JsonParser parser = new JsonParser();
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
|
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
|
||||||
* <https://www.chatopera.com>, Licensed under the Chunsong Public
|
* <https://www.chatopera.com>, Licensed under the Chunsong Public
|
||||||
* License, Version 1.0 (the "License"), https://docs.cskefu.com/licenses/v1.html
|
* License, Version 1.0 (the "License"), https://docs.cskefu.com/licenses/v1.html
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
* Copyright (C) 2019-2022 Chatopera Inc, <https://www.chatopera.com>,
|
* Copyright (C) 2019-2022 Chatopera Inc, <https://www.chatopera.com>,
|
||||||
* Licensed under the Apache License, Version 2.0,
|
* Licensed under the Apache License, Version 2.0,
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*/
|
*/
|
||||||
package com.cskefu.cc.activemq;
|
package com.cskefu.cc.activemq;
|
||||||
@ -43,7 +43,7 @@ public class BlackListEventSubscription {
|
|||||||
*
|
*
|
||||||
* @param payload
|
* @param payload
|
||||||
*/
|
*/
|
||||||
@JmsListener(destination = Constants.WEBIM_SOCKETIO_ONLINE_USER_BLACKLIST, containerFactory = "jmsListenerContainerQueue")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.WEBIM_SOCKETIO_ONLINE_USER_BLACKLIST + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
|
||||||
public void onMessage(final String payload) {
|
public void onMessage(final String payload) {
|
||||||
logger.info("[onMessage] payload {}", payload);
|
logger.info("[onMessage] payload {}", payload);
|
||||||
|
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
|
* Copyright (C) 2023 Beijing Huaxia Chunsong Technology Co., Ltd.
|
||||||
* <https://www.chatopera.com>, Licensed under the Chunsong Public
|
* <https://www.chatopera.com>, Licensed under the Chunsong Public
|
||||||
* License, Version 1.0 (the "License"), https://docs.cskefu.com/licenses/v1.html
|
* License, Version 1.0 (the "License"), https://docs.cskefu.com/licenses/v1.html
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
* Copyright (C) 2019-2022 Chatopera Inc, All rights reserved.
|
* Copyright (C) 2019-2022 Chatopera Inc, All rights reserved.
|
||||||
* <https://www.chatopera.com>
|
* <https://www.chatopera.com>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -20,10 +20,8 @@ import org.apache.activemq.command.ActiveMQTopic;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.jms.core.JmsMessagingTemplate;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.jms.core.JmsTemplate;
|
import org.springframework.jms.core.JmsTemplate;
|
||||||
import org.springframework.messaging.Message;
|
|
||||||
import org.springframework.messaging.core.MessagePostProcessor;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -35,6 +33,10 @@ public class BrokerPublisher {
|
|||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private JmsTemplate jmsTemplate;
|
private JmsTemplate jmsTemplate;
|
||||||
|
@Value("${cskefu.activemq.destination.prefix}")
|
||||||
|
private String prefix;
|
||||||
|
@Value("${cskefu.activemq.destination.suffix}")
|
||||||
|
private String suffix;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void setup() {
|
public void setup() {
|
||||||
@ -48,17 +50,18 @@ public class BrokerPublisher {
|
|||||||
* @param payload
|
* @param payload
|
||||||
* @param delay available by delayed seconds
|
* @param delay available by delayed seconds
|
||||||
*/
|
*/
|
||||||
public void send(final String destination, final String payload, final boolean isTopic, final int delay) {
|
public void send(String destination, final String payload, final boolean isTopic, final int delay) {
|
||||||
|
destination = prefix + destination + suffix;
|
||||||
try {
|
try {
|
||||||
if (isTopic) {
|
if (isTopic) {
|
||||||
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), payload, m -> {
|
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), payload, m -> {
|
||||||
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000 * delay);
|
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000L * delay);
|
||||||
return m;
|
return m;
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// 默认为Queue
|
// 默认为Queue
|
||||||
jmsTemplate.convertAndSend(destination, payload, m -> {
|
jmsTemplate.convertAndSend(destination, payload, m -> {
|
||||||
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000 * delay);
|
m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000L * delay);
|
||||||
return m;
|
return m;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -73,7 +76,8 @@ public class BrokerPublisher {
|
|||||||
* @param payload
|
* @param payload
|
||||||
* @param isTopic
|
* @param isTopic
|
||||||
*/
|
*/
|
||||||
public void send(final String destination, final String payload, boolean isTopic) {
|
public void send(String destination, final String payload, boolean isTopic) {
|
||||||
|
destination = prefix + destination + suffix;
|
||||||
try {
|
try {
|
||||||
if (isTopic) {
|
if (isTopic) {
|
||||||
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), payload);
|
jmsTemplate.convertAndSend(new ActiveMQTopic(destination), payload);
|
||||||
|
@ -17,6 +17,7 @@ import com.cskefu.cc.socketio.client.NettyClients;
|
|||||||
import com.cskefu.cc.util.SerializeUtil;
|
import com.cskefu.cc.util.SerializeUtil;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
import com.google.gson.JsonParser;
|
import com.google.gson.JsonParser;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -24,8 +25,6 @@ import org.springframework.beans.factory.annotation.Value;
|
|||||||
import org.springframework.jms.annotation.JmsListener;
|
import org.springframework.jms.annotation.JmsListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import jakarta.annotation.PostConstruct;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IM OnlineUser
|
* IM OnlineUser
|
||||||
*/
|
*/
|
||||||
@ -57,7 +56,7 @@ public class OnlineUserSubscription {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_TOPIC_ONLINEUSER, containerFactory = "jmsListenerContainerTopic")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_TOPIC_ONLINEUSER + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerTopic")
|
||||||
public void onMessage(final String payload){
|
public void onMessage(final String payload){
|
||||||
logger.info("[onMessage] payload {}", payload);
|
logger.info("[onMessage] payload {}", payload);
|
||||||
JsonParser parser = new JsonParser();
|
JsonParser parser = new JsonParser();
|
||||||
|
@ -22,6 +22,7 @@ import com.cskefu.cc.model.AgentStatus;
|
|||||||
import com.cskefu.cc.persistence.repository.AgentStatusRepository;
|
import com.cskefu.cc.persistence.repository.AgentStatusRepository;
|
||||||
import com.google.gson.JsonObject;
|
import com.google.gson.JsonObject;
|
||||||
import com.google.gson.JsonParser;
|
import com.google.gson.JsonParser;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -29,7 +30,6 @@ import org.springframework.beans.factory.annotation.Value;
|
|||||||
import org.springframework.jms.annotation.JmsListener;
|
import org.springframework.jms.annotation.JmsListener;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import jakarta.annotation.PostConstruct;
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -60,7 +60,7 @@ public class SocketioConnEventSubscription {
|
|||||||
logger.info("ActiveMQ Subscription is setup successfully.");
|
logger.info("ActiveMQ Subscription is setup successfully.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@JmsListener(destination = Constants.WEBIM_SOCKETIO_AGENT_DISCONNECT, containerFactory = "jmsListenerContainerQueue")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.WEBIM_SOCKETIO_AGENT_DISCONNECT + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
|
||||||
public void onMessage(final String payload) {
|
public void onMessage(final String payload) {
|
||||||
logger.info("[onMessage] payload {}", payload);
|
logger.info("[onMessage] payload {}", payload);
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ public class ChatbotEventSubscription {
|
|||||||
*
|
*
|
||||||
* @param payload
|
* @param payload
|
||||||
*/
|
*/
|
||||||
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_QUEUE_CHATBOT, containerFactory = "jmsListenerContainerQueue")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_QUEUE_CHATBOT + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
|
||||||
public void onMessage(final String payload) {
|
public void onMessage(final String payload) {
|
||||||
ChatMessage message = SerializeUtil.deserialize(payload);
|
ChatMessage message = SerializeUtil.deserialize(payload);
|
||||||
try {
|
try {
|
||||||
@ -238,7 +238,7 @@ public class ChatbotEventSubscription {
|
|||||||
for (int i = 0; i < faqReplies.length(); i++) {
|
for (int i = 0; i < faqReplies.length(); i++) {
|
||||||
JSONObject sugg = new JSONObject();
|
JSONObject sugg = new JSONObject();
|
||||||
JSONObject faqReply = faqReplies.getJSONObject(i);
|
JSONObject faqReply = faqReplies.getJSONObject(i);
|
||||||
sugg.put("label", Integer.toString(i + 1) + ". " + faqReply.getString("post"));
|
sugg.put("label", i + 1 + ". " + faqReply.getString("post"));
|
||||||
sugg.put("text", faqReply.getString("post"));
|
sugg.put("text", faqReply.getString("post"));
|
||||||
sugg.put("type", "qlist");
|
sugg.put("type", "qlist");
|
||||||
suggs.put(sugg);
|
suggs.put(sugg);
|
||||||
|
@ -43,7 +43,7 @@ public class MessengerEventSubscription {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private MessengerMessageProxy messengerMessageProxy;
|
private MessengerMessageProxy messengerMessageProxy;
|
||||||
|
|
||||||
@JmsListener(destination = Constants.INSTANT_MESSAGING_MQ_QUEUE_FACEBOOK_OTN, containerFactory = "jmsListenerContainerQueue")
|
@JmsListener(destination = "${cskefu.activemq.destination.prefix}" + Constants.INSTANT_MESSAGING_MQ_QUEUE_FACEBOOK_OTN + "${cskefu.activemq.destination.suffix}", containerFactory = "jmsListenerContainerQueue")
|
||||||
public void onPublish(final String jsonStr) {
|
public void onPublish(final String jsonStr) {
|
||||||
JSONObject payload = JSONObject.parseObject(jsonStr);
|
JSONObject payload = JSONObject.parseObject(jsonStr);
|
||||||
String otnId = payload.getString("otnId");
|
String otnId = payload.getString("otnId");
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#
|
#
|
||||||
# Copyright (C) 2017 优客服-多渠道客服系统
|
# Copyright (C) 2017 \u4F18\u5BA2\u670D-\u591A\u6E20\u9053\u5BA2\u670D\u7CFB\u7EDF
|
||||||
# Modifications copyright (C) 2018-2022 Chatopera Inc, <https://www.chatopera.com>
|
# Modifications copyright (C) 2018-2022 Chatopera Inc, <https://www.chatopera.com>
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -48,7 +48,7 @@ server.tomcat.max-swallow-size=1000000000
|
|||||||
server.tomcat.max-threads=400
|
server.tomcat.max-threads=400
|
||||||
|
|
||||||
##############################################
|
##############################################
|
||||||
# 日志及临时文件存储
|
# \u65E5\u5FD7\u53CA\u4E34\u65F6\u6587\u4EF6\u5B58\u50A8
|
||||||
##############################################
|
##############################################
|
||||||
server.log.path=../logs
|
server.log.path=../logs
|
||||||
server.log.level=INFO
|
server.log.level=INFO
|
||||||
@ -109,6 +109,9 @@ spring.activemq.user=admin
|
|||||||
spring.activemq.password=admin
|
spring.activemq.password=admin
|
||||||
spring.activemq.pool.enabled=true
|
spring.activemq.pool.enabled=true
|
||||||
spring.activemq.pool.max-connections=50
|
spring.activemq.pool.max-connections=50
|
||||||
|
# \u4F7F\u7528\u524D\u540E\u7F00\u6765\u907F\u514D\u4E0D\u540C profile \u5171\u7528 ActiveMQ \u5B9E\u4F8B\u5F15\u53D1\u7684\u6D88\u606F\u8DEF\u7531\u9519\u4E71\u95EE\u9898
|
||||||
|
cskefu.activemq.destination.prefix=
|
||||||
|
cskefu.activemq.destination.suffix=
|
||||||
|
|
||||||
##############################################
|
##############################################
|
||||||
# Actuator
|
# Actuator
|
||||||
|
Loading…
x
Reference in New Issue
Block a user