1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-06-16 18:10:13 +08:00

update kafka-questions-01.md

This commit is contained in:
jun 2023-08-10 19:35:57 +08:00
parent dd3638d568
commit df4f2e3516

View File

@ -277,9 +277,9 @@ public long nextBackOff() {
```
那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是:
```Java
public DefaultErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
public DefaultErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}
```
SeekUtils.DEFAULT_BACK_OFF 定义的是
```Java
@ -297,21 +297,21 @@ DEFAULT_MAX_FAILURES 的值是10currentAttempts从0到9所以总共会执
spring kafka 中只需要加上 `@DltHandler` 注解即可将重试失败的消息推到死信队列死信队列的topic是在原 topic 后加上 '.DLT'。然后开启新的消费者消费死信队列即可。
```Java
@DltHandler
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
@DltHandler
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}
@KafkaListener(topics = {KafkaConst.TEST_TOPIC + ".DLT"}, groupId = "apple")
private void delCustomer(String message) {
//
}
@KafkaListener(topics = {KafkaConst.TEST_TOPIC + ".DLT"}, groupId = "apple")
private void delCustomer(String message) {
//
}
```
## 如何自定义重试次数,以及时间间隔
"......,未完待续。"