1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-06-16 18:10:13 +08:00

Merge pull request #2133 from dengminchuan/main

RocketMQ完善
This commit is contained in:
Guide 2023-08-15 20:26:01 +08:00 committed by GitHub
commit 8acecb92a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -298,6 +298,49 @@ tag:
其实很简单,我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 **Hash 取模法** 来保证同一个订单在同一个队列中就行了。
RocketMQ实现了两种队列选择算法也可以自己实现
- 轮询算法
- 轮询算法就是向消息指定的topic所在队列中依次发送消息保证消息均匀分布
- 是RocketMQ默认队列选择算法
- 最小投递延迟算法
- 每次消息投递的时候统计消息投递的延迟,选择队列时优先选择消息延时小的队列,导致消息分布不均匀,按照如下设置即可。
- ```java
producer.setSendLatencyFaultEnable(true);
```
- 继承MessageQueueSelector实现
- ```java
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//从mqs中选择一个队列,可以根据msg特点选择
return null;
}
}, new Object());
```
### 特殊情况处理
#### 发送异常
选择队列后会与Broker建立连接通过网络请求将消息发送到Broker上如果Broker挂了或者网络波动发送消息超时此时RocketMQ会进行重试。
重新选择其他Broker中的消息队列进行发送默认重试两次可以手动设置。
```java
producer.setRetryTimesWhenSendFailed(5);
```
#### 消息过大
消息超过4k时RocketMQ会将消息压缩后在发送到Broker上减少网络资源的占用。
### 重复消费
emmm就两个字—— **幂等** 。在编程中一个*幂等* 操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。比如说,这个时候我们有一个订单的处理积分的系统,每当来一个消息的时候它就负责为创建这个订单的用户的积分加上相应的数值。可是有一次,消息队列发送给订单系统 FrancisQ 的订单信息,其要求是给 FrancisQ 的积分加上 500。但是积分系统在收到 FrancisQ 的订单信息处理完成之后返回给消息队列处理成功的信息的时候出现了网络波动(当然还有很多种情况,比如 Broker 意外重启等等),这条回应没有发送成功。
@ -352,6 +395,68 @@ emmm就两个字—— **幂等** 。在编程中一个*幂等* 操作的特
这是官方文档的解释,我直接照搬过来就当科普了 😁😁😁。
## RocketMQ如何保证高性能读写
### 传统IO方式
![3](https://img1.imgtp.com/2023/08/15/9DQUZuL7.png)
传统的IO读写其实就是read + write的操作整个过程会分为如下几步
- 用户调用read()方法开始读取数据此时发生一次上下文从用户态到内核态的切换也就是图示的切换1
- 将磁盘数据通过DMA拷贝到内核缓存区
- 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据
- read()方法返回此时就会从内核态切换到用户态也就是图示的切换2
- 当我们拿到数据之后就可以调用write()方法此时上下文会从用户态切换到内核态即图示切换3
- CPU将用户缓冲区的数据拷贝到Socket缓冲区
- 将Socket缓冲区数据拷贝至网卡
- write()方法返回上下文重新从内核态切换到用户态即图示切换4
整个过程发生了4次上下文切换和4次数据的拷贝这在高并发场景下肯定会严重影响读写性能故引入了零拷贝技术
### 零拷贝技术
#### mmap
mmapmemory map是一种内存映射文件的方法即将一个文件或者其它对象映射到进程的地址空间实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
简单地说就是内核缓冲区和应用缓冲区共享从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。基于此上述架构图可变为
![4](https://img1.imgtp.com/2023/08/15/CHmGd0II.png)
基于mmap IO读写其实就变成mmap + write的操作也就是用mmap替代传统IO中的read操作。
当用户发起mmap调用的时候会发生上下文切换1进行内存映射然后数据被拷贝到内核缓冲区mmap返回发生上下文切换2随后用户调用write发生上下文切换3将内核缓冲区的数据拷贝到Socket缓冲区write返回发生上下文切换4。
发生4次上下文切换和3次IO拷贝操作在Java中的实现
```java
FileChannel fileChannel = new RandomAccessFile("test.txt", "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
```
#### sendfile
sendfile()跟mmap()一样也会减少一次CPU拷贝但是它同时也会减少两次上下文切换。
![5](https://img1.imgtp.com/2023/08/15/jqLgCEBY.png)
如图用户在发起sendfile()调用时会发生切换1之后数据通过DMA拷贝到内核缓冲区之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区最后拷贝到网卡sendfile()返回发生切换2。发生了3次拷贝和两次切换。Java也提供了相应api
```java
FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//调用transferTo方法向目标数据传输
channel.transferTo(position, len, target);
```
在如上代码中并没有文件的读写操作而是直接将文件的数据传输到target目标缓冲区也就是说sendfile是无法知道文件的具体的数据的但是mmap不一样他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输只有mmap可以满足。
通过上面的一些介绍结论是基于零拷贝技术可以减少CPU的拷贝次数和上下文切换次数从而可以实现文件高效的读写操作。
RocketMQ内部主要是使用基于mmap实现的零拷贝(其实就是调用上述提到的api)用来读写文件这也是RocketMQ为什么快的一个很重要原因。
## RocketMQ 的刷盘机制
上面我讲了那么多的 `RocketMQ` 的架构和设计原理,你有没有好奇