From 24e50acc7e40fe8e8b7419e0af88e70b73ac4065 Mon Sep 17 00:00:00 2001 From: Guide Date: Sat, 12 Aug 2023 11:51:51 +0800 Subject: [PATCH] =?UTF-8?q?[docs=20update]=E5=AF=B9=20kafka=20=E5=86=85?= =?UTF-8?q?=E5=AE=B9=E7=9A=84=20pr=20=E8=BF=9B=E8=A1=8C=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message-queue/kafka-questions-01.md | 174 ++++++++++++------ 1 file changed, 116 insertions(+), 58 deletions(-) diff --git a/docs/high-performance/message-queue/kafka-questions-01.md b/docs/high-performance/message-queue/kafka-questions-01.md index 648f9fe2..a39d3f6c 100644 --- a/docs/high-performance/message-queue/kafka-questions-01.md +++ b/docs/high-performance/message-queue/kafka-questions-01.md @@ -5,6 +5,8 @@ tag: - 消息队列 --- +## Kafka 基础 + ### Kafka 是什么?主要应用场景有哪些? Kafka 是一个分布式流式处理平台。这到底是什么意思呢? @@ -61,6 +63,8 @@ Kafka 主要有两大应用场景: > **RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。** +## Kafka 核心概念 + ### 什么是 Producer、Consumer、Broker、Topic、Partition? Kafka 将生产者发布的消息发送到 **Topic(主题)** 中,需要这些消息的消费者可以订阅这些 **Topic(主题)**,如下图所示: @@ -91,13 +95,15 @@ Kafka 将生产者发布的消息发送到 **Topic(主题)** 中,需要这 1. Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。 2. Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。 -### Zookeeper 在 Kafka 中的作用知道吗? +## Zookeeper 和 Kafka -> **要想搞懂 zookeeper 在 Kafka 中的作用 一定要自己搭建一个 Kafka 环境然后自己进 zookeeper 去看一下有哪些文件夹和 Kafka 有关,每个节点又保存了什么信息。** 一定不要光看不实践,这样学来的也终会忘记!这部分内容参考和借鉴了这篇文章:https://www.jianshu.com/p/a036405f989c 。 +### Zookeeper 在 Kafka 中的作用是什么? + +> 要想搞懂 zookeeper 在 Kafka 中的作用 一定要自己搭建一个 Kafka 环境然后自己进 zookeeper 去看一下有哪些文件夹和 Kafka 有关,每个节点又保存了什么信息。 一定不要光看不实践,这样学来的也终会忘记!这部分内容参考和借鉴了这篇文章:https://www.jianshu.com/p/a036405f989c 。 下图就是我的本地 Zookeeper ,它成功和我本地的 Kafka 关联上(以下文件夹结构借助 idea 插件 Zookeeper tool 实现)。 - + ZooKeeper 主要为 Kafka 提供元数据的管理的功能。 @@ -108,6 +114,16 @@ ZooKeeper 主要为 Kafka 提供元数据的管理的功能。 3. **负载均衡**:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。 4. ...... +### 使用 Kafka 能否不引入 Zookeeper? + +在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。 + +不过,要提示一下:**如果要使用 KRaft 模式的话,建议选择较高版本的 Kafka,因为这个功能还在持续完善优化中。Kafka 3.3.1 版本是第一个将 KRaft(Kafka Raft)共识协议标记为生产就绪的版本。** + +![](https://oss.javaguide.cn/github/javaguide/high-performance/message-queue/kafka3.3.1-kraft- production-ready.png) + +## Kafka 消费顺序、消息丢失和重复消费 + ### Kafka 如何保证消息的消费顺序? 我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是: @@ -119,7 +135,7 @@ ZooKeeper 主要为 Kafka 提供元数据的管理的功能。 我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。 -![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/KafkaTopicPartionsLayout.png) +![](https://oss.javaguide.cn/github/javaguide/high-performance/message-queue/KafkaTopicPartionsLayout.png) 每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。 **Kafka 只能为我们保证 Partition(分区) 中的消息有序。** @@ -136,7 +152,7 @@ Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data 当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的, -### Kafka 如何保证消息不丢失 +### Kafka 如何保证消息不丢失? #### 生产者丢失消息的情况 @@ -164,7 +180,7 @@ if (sendResult.getRecordMetadata() != null) { 如果消息发送失败的话,我们检查失败的原因之后重新发送即可! -**另外这里推荐为 Producer 的`retries `(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了** +另外,这里推荐为 Producer 的`retries`(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。 #### 消费者丢失消息的情况 @@ -204,7 +220,7 @@ acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后 我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 **unclean.leader.election.enable = false** 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。 -### Kafka 如何保证消息不重复消费 +### Kafka 如何保证消息不重复消费? **kafka 出现消息重复消费的原因:** @@ -218,35 +234,38 @@ acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后 - 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样 - 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。 -# kafka的重试机制 -网上关于 Spring kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 Spring-kafka-2.9.3 源码重新梳理一下。 +## Kafka 重试机制 -## 消费失败会怎么样 +在 Kafka 如何保证消息不丢失这里,我们提到了 Kafka 的重试机制。由于这部分内容较为重要,我们这里再来详细介绍一下。 + +网上关于 Spring Kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 [spring-kafka-2.9.3](https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka/2.9.3) 源码重新梳理一下。 + +### 消费失败会怎么样? 在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了? 生产者代码: - ```Java - for (int i = 0; i < 10; i++) { - kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i)) - } - ``` +```Java + for (int i = 0; i < 10; i++) { + kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i)) + } +``` 消费者消代码: - ```Java - @KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple") - private void customer(String message) throws InterruptedException { - log.info("kafka customer:{}",message); - Integer n = Integer.parseInt(message); - if (n%5==0){ - throw new RuntimeException(); - } - } - ``` +```Java + @KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple") + private void customer(String message) throws InterruptedException { + log.info("kafka customer:{}",message); + Integer n = Integer.parseInt(message); + if (n%5==0){ + throw new RuntimeException(); + } + } +``` -在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95 重试多次后会被跳过。 +在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 `test-0@95` 重试多次后会被跳过。 ```Java 2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Skipping seek of: test-0@95 @@ -255,23 +274,66 @@ acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后 ``` -## 默认会重试多少次? +因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。 + +### 默认会重试多少次? 默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔? -10 次。看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑: -```Java - FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition); - this.retryListeners.forEach(rl -> - rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get())); - long nextBackOff = failedRecord.getBackOffExecution().nextBackOff(); - if (nextBackOff != BackOffExecution.STOP) { - this.backOffHandler.onNextBackOff(container, exception, nextBackOff); - return false; - } +看源码 `FailedRecordTracker` 类有个 `recovered` 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑: + +```java + @Override + public boolean recovered(ConsumerRecord << ? , ? > record, Exception exception, + @Nullable MessageListenerContainer container, + @Nullable Consumer << ? , ? > consumer) throws InterruptedException { + + if (this.noRetries) { + // 不支持重试 + attemptRecovery(record, exception, null, consumer); + return true; + } + // 取已经失败的消费记录集合 + Map < TopicPartition, FailedRecord > map = this.failures.get(); + if (map == null) { + this.failures.set(new HashMap < > ()); + map = this.failures.get(); + } + // 获取消费记录所在的Topic和Partition + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition); + // 通知注册的重试监听器,消息投递失败 + this.retryListeners.forEach(rl - > + rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get())); + // 获取下一次重试的时间间隔 + long nextBackOff = failedRecord.getBackOffExecution().nextBackOff(); + if (nextBackOff != BackOffExecution.STOP) { + this.backOffHandler.onNextBackOff(container, exception, nextBackOff); + return false; + } else { + attemptRecovery(record, exception, topicPartition, consumer); + map.remove(topicPartition); + if (map.isEmpty()) { + this.failures.remove(); + } + return true; + } + } ``` -其中 BackOffExecution.STOP 的值为 -1,nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP,既超过这个最大执行次数后才会停止重试。 +其中, `BackOffExecution.STOP` 的值为 -1。 + +```java +@FunctionalInterface +public interface BackOffExecution { + + long STOP = -1; + long nextBackOff(); + +} +``` + +`nextBackOff` 的值调用 `BackOff` 类的 `nextBackOff()` 函数。如果当前执行次数大于最大执行次数则返回 `STOP`,既超过这个最大执行次数后才会停止重试。 ```Java public long nextBackOff() { @@ -285,7 +347,7 @@ public long nextBackOff() { } ``` -那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是: +那么这个 `getMaxAttempts` 的值又是多少呢?回到最开始,当执行出错会进入 `DefaultErrorHandler` 。`DefaultErrorHandler` 默认的构造函数是: ```Java public DefaultErrorHandler() { @@ -293,7 +355,7 @@ public DefaultErrorHandler() { } ``` -SeekUtils.DEFAULT_BACK_OFF 定义的是: +`SeekUtils.DEFAULT_BACK_OFF` 定义的是: ```Java public static final int DEFAULT_MAX_FAILURES = 10; @@ -301,11 +363,13 @@ public static final int DEFAULT_MAX_FAILURES = 10; public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1); ``` -DEFAULT_MAX_FAILURES 的值是10,currentAttempts从0到9,所以总共会执行10次,每次重试的时间间隔为0。 +`DEFAULT_MAX_FAILURES` 的值是 10,`currentAttempts` 从 0 到 9,所以总共会执行 10 次,每次重试的时间间隔为 0。 -## 如何自定义重试次数,以及时间间隔 +最后,简单总结一下:Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。 -从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff 控制的,FixedBackOff 是 DefaultErrorHandler 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。 +### 如何自定义重试次数以及时间间隔? + +从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 `FixedBackOff` 控制的,`FixedBackOff` 是 `DefaultErrorHandler` 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 `DefaultErrorHandler` 初始化的时候传入自定义的 `FixedBackOff` 即可。重新实现一个 `KafkaListenerContainerFactory` ,调用 `setCommonErrorHandler` 设置新的自定义的错误处理器就可以实现。 ```Java @Bean @@ -319,9 +383,9 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFacto } ``` -## 如何在重试失败后进行告警 +### 如何在重试失败后进行告警? -自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。 +自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 `DefaultErrorHandler` 的 `handleRemaining` 函数,加上自定义的告警等操作。 ```Java @Slf4j @@ -339,17 +403,19 @@ public class DelErrorHandler extends DefaultErrorHandler { } } ``` -DefaultErrorHandler 只是默认的一个错误处理器,Spring kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。 -## 重试失败后的数据如何再次处理 +`DefaultErrorHandler` 只是默认的一个错误处理器,Spring Kafka 还提供了 `CommonErrorHandler` 接口。手动实现 `CommonErrorHandler` 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。 + +### 重试失败后的数据如何再次处理? 当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢? -死信队列(Dead Letter Queue,简称DLQ)是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。 +**死信队列(Dead Letter Queue,简称 DLQ)** 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。 `@RetryableTopic` 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。 ```Java +// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒 @RetryableTopic( attempts = "5", backoff = @Backoff(delay = 100, maxDelay = 1000) @@ -365,17 +431,9 @@ private void customer(String message) { } ``` -这个例子在listen方法上使用@RetryableTopic注解,配置了: -- 重试5次 -- 重试间隔100毫秒,最大间隔1秒 +当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 `@DltHandler` 处理,也可以使用 `@KafkaListener` 重新消费。 -重试完毕后,如果仍然失败,则会进入消息队列,此时就存在两个队列: -- test 原队列 -- test-dlt 原队列对应的死信队列 - -对于死信队列的处理,既可以用 `@DltHandler` 处理,也可以使用 `@KafkaListener` 重新消费。 - -### Reference +## 参考 - Kafka 官方文档:https://kafka.apache.org/documentation/ - 极客时间—《Kafka 核心技术与实战》第 11 节:无消息丢失配置怎么实现?