diff --git a/docs/high-performance/message-queue/rocketmq-questions.md b/docs/high-performance/message-queue/rocketmq-questions.md index b1334908..5b04ec0c 100644 --- a/docs/high-performance/message-queue/rocketmq-questions.md +++ b/docs/high-performance/message-queue/rocketmq-questions.md @@ -291,7 +291,7 @@ tag: #### 定时消息 -在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。定时消息仅支持在 MessageType 为 Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。 +在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。定时消息仅支持在 MessageType 为 Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。在 4.x 版本中,只支持延时消息,默认分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也可以在配置文件中增加自定义的延时等级和时长。在 5.x 版本中,开始支持定时消息,在构造消息时提供了 3 个 API 来指定延迟时间或定时时间。 基于定时消息的超时任务处理具备如下优势: @@ -319,7 +319,7 @@ tag: #### 事务消息 -施工中。。。 +事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。简单来讲,就是将本地事务(数据库的DML操作)与发送消息合并在同一个事务中。例如,新增一个订单。在事务未提交之前,不发送订阅的消息。发送消息的动作随着事务的成功提交而发送,随着事务的回滚而取消。当然真正地处理过程不止这么简单,包含了半消息、事务监听和事务回查等概念,下面有更详细的说明。 ## 关于发送消息 @@ -540,6 +540,210 @@ emmm,就两个字—— **幂等** 。在编程中一个*幂等* 操作的特 你还需要注意的是,在 `MQ Server` 指向系统 B 的操作已经和系统 A 不相关了,也就是说在消息队列中的分布式事务是——**本地事务和存储消息到消息队列才是同一个事务**。这样也就产生了事务的**最终一致性**,因为整个过程是异步的,**每个系统只要保证它自己那一部分的事务就行了**。 +实践中会遇到的问题:事务消息需要一个事务监听器来监听本地事务是否成功,并且事务监听器接口只允许被实现一次。那就意味着需要把各种事务消息的本地事务都写在一个接口方法里面,必将会产生大量的耦合和类型判断。采用函数 Function 接口来包装整个业务过程,作为一个参数传递到监听器的接口方法中。再调用 Function 的 apply() 方法来执行业务,事务也会在 apply() 方法中执行。让监听器与业务之间实现解耦,使之具备了真实生产环境中的可行性。 + +1.模拟一个添加用户浏览记录的需求 +```java +@PostMapping("/add") +@ApiOperation("添加用户浏览记录") +public Result add(Long userId, Long forecastLogId) { + + // 函数式编程:浏览记录入库 + Function function = transactionId -> viewHistoryHandler.addViewHistory(transactionId, userId, forecastLogId); + + Map hashMap = new HashMap<>(); + hashMap.put("userId", userId); + hashMap.put("forecastLogId", forecastLogId); + String jsonString = JSON.toJSONString(hashMap); + + // 发送事务消息;将本地的事务操作,用函数Function接口接收,作为一个参数传入到方法中 + TransactionSendResult transactionSendResult = mqProducerService.sendTransactionMessage(jsonString, MQDestination.TAG_ADD_VIEW_HISTORY, function); + return Result.success(transactionSendResult); +} +``` + +2.发送事务消息的方法 +```java +/** + * 发送事务消息 + * + * @param msgBody + * @param tag + * @param function + * @return + */ +public TransactionSendResult sendTransactionMessage(String msgBody, String tag, Function function) { + // 构建消息体 + Message message = buildMessage(msgBody); + + // 构建消息投递信息 + String destination = buildDestination(tag); + + TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(destination, message, function); + return result; +} +``` + +3.生产者消息监听器,只允许一个类去实现该监听器 +```java +@Slf4j +@RocketMQTransactionListener +public class TransactionMsgListener implements RocketMQLocalTransactionListener { + + @Autowired + private RedisService redisService; + + /** + * 执行本地事务(在发送消息成功时执行) + * + * @param message + * @param o + * @return commit or rollback or unknown + */ + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { + + // 1、获取事务ID + String transactionId = null; + try { + transactionId = message.getHeaders().get("rocketmq_TRANSACTION_ID").toString(); + // 2、判断传入函数对象是否为空,如果为空代表没有要执行的业务直接抛弃消息 + if (o == null) { + //返回ROLLBACK状态的消息会被丢弃 + log.info("事务消息回滚,没有需要处理的业务 transactionId={}", transactionId); + return RocketMQLocalTransactionState.ROLLBACK; + } + // 将Object o转换成Function对象 + Function function = (Function) o; + // 执行业务 事务也会在function.apply中执行 + Boolean apply = function.apply(transactionId); + if (apply) { + log.info("事务提交,消息正常处理 transactionId={}", transactionId); + //返回COMMIT状态的消息会立即被消费者消费到 + return RocketMQLocalTransactionState.COMMIT; + } + } catch (Exception e) { + log.info("出现异常 返回ROLLBACK transactionId={}", transactionId); + return RocketMQLocalTransactionState.ROLLBACK; + } + return RocketMQLocalTransactionState.ROLLBACK; + } + + /** + * 事务回查机制,检查本地事务的状态 + * + * @param message + * @return + */ + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message message) { + + String transactionId = message.getHeaders().get("rocketmq_TRANSACTION_ID").toString(); + + // 查redis + MqTransaction mqTransaction = redisService.getCacheObject("mqTransaction:" + transactionId); + if (Objects.isNull(mqTransaction)) { + return RocketMQLocalTransactionState.ROLLBACK; + } + return RocketMQLocalTransactionState.COMMIT; + } +} +``` + +4.模拟的业务场景,这里的方法必须提取出来,放在别的类里面.如果调用方与被调用方在同一个类中,会发生事务失效的问题. +```java +@Component +public class ViewHistoryHandler { + + @Autowired + private IViewHistoryService viewHistoryService; + + @Autowired + private IMqTransactionService mqTransactionService; + + @Autowired + private RedisService redisService; + + /** + * 浏览记录入库 + * + * @param transactionId + * @param userId + * @param forecastLogId + * @return + */ + @Transactional + public Boolean addViewHistory(String transactionId, Long userId, Long forecastLogId) { + // 构建浏览记录 + ViewHistory viewHistory = new ViewHistory(); + viewHistory.setUserId(userId); + viewHistory.setForecastLogId(forecastLogId); + viewHistory.setCreateTime(LocalDateTime.now()); + boolean save = viewHistoryService.save(viewHistory); + + // 本地事务信息 + MqTransaction mqTransaction = new MqTransaction(); + mqTransaction.setTransactionId(transactionId); + mqTransaction.setCreateTime(new Date()); + mqTransaction.setStatus(MqTransaction.StatusEnum.VALID.getStatus()); + + // 1.可以把事务信息存数据库 + mqTransactionService.save(mqTransaction); + + // 2.也可以选择存redis,4个小时有效期,'4个小时'是RocketMQ内置的最大回查超时时长,过期未确认将强制回滚 + redisService.setCacheObject("mqTransaction:" + transactionId, mqTransaction, 4L, TimeUnit.HOURS); + + // 放开注释,模拟异常,事务回滚 + // int i = 10 / 0; + + return save; + } +} +``` +5.消费消息,以及幂等处理 +```java +@Service +@RocketMQMessageListener(topic = MQDestination.TOPIC, selectorExpression = MQDestination.TAG_ADD_VIEW_HISTORY, consumerGroup = MQDestination.TAG_ADD_VIEW_HISTORY) +public class ConsumerAddViewHistory implements RocketMQListener { + // 监听到消息就会执行此方法 + @Override + public void onMessage(Message message) { + // 幂等校验 + String transactionId = message.getTransactionId(); + + // 查redis + MqTransaction mqTransaction = redisService.getCacheObject("mqTransaction:" + transactionId); + + // 不存在事务记录 + if (Objects.isNull(mqTransaction)) { + return; + } + + // 已消费 + if (Objects.equals(mqTransaction.getStatus(), MqTransaction.StatusEnum.CONSUMED.getStatus())) { + return; + } + + String msg = new String(message.getBody()); + Map map = JSON.parseObject(msg, new TypeReference>() { + }); + Long userId = map.get("userId"); + Long forecastLogId = map.get("forecastLogId"); + + // 下游的业务处理 + // TODO 记录用户喜好,更新用户画像 + + // TODO 更新'证券预测文章'的浏览量,重新计算文章的曝光排序 + + // 更新状态为已消费 + mqTransaction.setUpdateTime(new Date()); + mqTransaction.setStatus(MqTransaction.StatusEnum.CONSUMED.getStatus()); + redisService.setCacheObject("mqTransaction:" + transactionId, mqTransaction, 4L, TimeUnit.HOURS); + log.info("监听到消息:msg={}", JSON.toJSONString(map)); + } +} +``` + ## 如何解决消息堆积问题? 在上面我们提到了消息队列一个很重要的功能——**削峰** 。那么如果这个峰值太大了导致消息堆积在队列中怎么办呢?