rabbitmq实现延迟消息

This commit is contained in:
zhh 2018-09-17 11:27:10 +08:00
parent f50739ac20
commit e298a170d3
8 changed files with 100 additions and 20 deletions

View File

@ -13,13 +13,13 @@ import org.springframework.stereotype.Component;
* Created by macro on 2018/9/14. * Created by macro on 2018/9/14.
*/ */
@Component @Component
@RabbitListener(queues = "${rabbitmq.queue.name.cancelOrder}") @RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver { public class CancelOrderReceiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired @Autowired
private OmsPortalOrderService portalOrderService; private OmsPortalOrderService portalOrderService;
@RabbitHandler @RabbitHandler
public void process(Long orderId){ public void handle(Long orderId){
portalOrderService.cancelOrder(orderId); portalOrderService.cancelOrder(orderId);
LOGGER.info("process orderId:{}",orderId); LOGGER.info("process orderId:{}",orderId);
} }

View File

@ -1,10 +1,13 @@
package com.macro.mall.portal.component; package com.macro.mall.portal.component;
import com.macro.mall.portal.domain.QueueEnum;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@ -14,13 +17,19 @@ import org.springframework.stereotype.Component;
@Component @Component
public class CancelOrderSender { public class CancelOrderSender {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
@Value("${rabbitmq.queue.name.cancelOrder}")
private String QUEUE_NAME_CANCEL_ORDER;
@Autowired @Autowired
private AmqpTemplate amqpTemplate; private AmqpTemplate amqpTemplate;
public void send(Long orderId){ public void sendMessage(Long orderId,final long delayTimes){
amqpTemplate.convertAndSend(QUEUE_NAME_CANCEL_ORDER,orderId); //给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send orderId:{}",orderId); LOGGER.info("send orderId:{}",orderId);
} }
} }

View File

@ -1,7 +1,7 @@
package com.macro.mall.portal.config; package com.macro.mall.portal.config;
import org.springframework.amqp.core.Queue; import com.macro.mall.portal.domain.QueueEnum;
import org.springframework.beans.factory.annotation.Value; import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -11,13 +11,69 @@ import org.springframework.context.annotation.Configuration;
*/ */
@Configuration @Configuration
public class RabbitMqConfig { public class RabbitMqConfig {
@Value("${rabbitmq.queue.name.cancelOrder}")
private String QUEUE_NAME_CANCEL_ORDER;
/** /**
* 超时取消订单消息 * 订单消息实际消费队列所绑定的交换机
*/ */
@Bean @Bean
public Queue cancelOrderQueue(){ DirectExchange orderDirect() {
return new Queue(QUEUE_NAME_CANCEL_ORDER); return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
} }
/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 订单延迟队列死信队列
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
} }

View File

@ -1,6 +1,5 @@
package com.macro.mall.portal.controller; package com.macro.mall.portal.controller;
import com.macro.mall.portal.component.CancelOrderSender;
import com.macro.mall.portal.domain.CommonResult; import com.macro.mall.portal.domain.CommonResult;
import com.macro.mall.portal.domain.ConfirmOrderResult; import com.macro.mall.portal.domain.ConfirmOrderResult;
import com.macro.mall.portal.domain.OrderParam; import com.macro.mall.portal.domain.OrderParam;
@ -21,8 +20,6 @@ import org.springframework.web.bind.annotation.*;
public class OmsPortalOrderController { public class OmsPortalOrderController {
@Autowired @Autowired
private OmsPortalOrderService portalOrderService; private OmsPortalOrderService portalOrderService;
@Autowired
private CancelOrderSender cancelOrderSender;
@ApiOperation("根据购物车信息生成确认单信息") @ApiOperation("根据购物车信息生成确认单信息")
@RequestMapping(value = "/generateConfirmOrder",method = RequestMethod.POST) @RequestMapping(value = "/generateConfirmOrder",method = RequestMethod.POST)
@ResponseBody @ResponseBody
@ -55,7 +52,7 @@ public class OmsPortalOrderController {
@RequestMapping(value = "/cancelOrder",method = RequestMethod.POST) @RequestMapping(value = "/cancelOrder",method = RequestMethod.POST)
@ResponseBody @ResponseBody
public Object cancelOrder(Long orderId){ public Object cancelOrder(Long orderId){
cancelOrderSender.send(orderId); portalOrderService.sendDelayMessageCancelOrder(orderId);
return new CommonResult().success(null); return new CommonResult().success(null);
} }
} }

View File

@ -11,11 +11,11 @@ public enum QueueEnum {
/** /**
* 消息通知队列 * 消息通知队列
*/ */
MESSAGE_QUEUE_ORDER("mall.order.direct", "mall.order.cancel", "mall.order.cancel"), QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
/** /**
* 消息通知ttl队列 * 消息通知ttl队列
*/ */
MESSAGE_TTL_QUEUE_ORDER("mall.order.topic.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl"); QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");
/** /**
* 交换名称 * 交换名称

View File

@ -38,4 +38,9 @@ public interface OmsPortalOrderService {
*/ */
@Transactional @Transactional
void cancelOrder(Long orderId); void cancelOrder(Long orderId);
/**
* 发送延迟消息取消订单
*/
void sendDelayMessageCancelOrder(Long orderId);
} }

View File

@ -54,6 +54,8 @@ public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private OmsOrderSettingMapper orderSettingMapper; private OmsOrderSettingMapper orderSettingMapper;
@Autowired @Autowired
private OmsOrderItemMapper orderItemMapper; private OmsOrderItemMapper orderItemMapper;
@Autowired
private CancelOrderSender cancelOrderSender;
@Override @Override
public ConfirmOrderResult generateConfirmOrder() { public ConfirmOrderResult generateConfirmOrder() {
@ -292,6 +294,15 @@ public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
} }
} }
@Override
public void sendDelayMessageCancelOrder(Long orderId) {
//获取订单超时时间
OmsOrderSetting orderSetting = orderSettingMapper.selectByPrimaryKey(1L);
long delayTimes = orderSetting.getNormalOrderOvertime()*60*1000;
//发送延迟消息
cancelOrderSender.sendMessage(orderId,delayTimes);
}
/** /**
* 生成18位订单编号:8位日期+2位平台号码+2位支付方式+6位以上自增id * 生成18位订单编号:8位日期+2位平台号码+2位支付方式+6位以上自增id
*/ */

View File

@ -66,5 +66,7 @@ spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/mall spring.rabbitmq.virtual-host=/mall
spring.rabbitmq.username=mall spring.rabbitmq.username=mall
spring.rabbitmq.password=mall spring.rabbitmq.password=mall
#如果对异步消息需要回调必须设置为true
spring.rabbitmq.publisher-confirms=true
#===rabbitMq end=== #===rabbitMq end===