1
0
mirror of https://github.com/Snailclimb/JavaGuide synced 2025-06-16 18:10:13 +08:00

kafka 入门看这一篇就够了

This commit is contained in:
Kou Shuang 2019-10-10 13:57:04 +08:00
parent 87e52b998e
commit 4321d7a068
4 changed files with 232 additions and 0 deletions

View File

@ -1,3 +1,5 @@
>
>
> 本文由 JavaGuide 读者推荐JavaGuide 对文章进行了整理排版原文地址https://www.wmyskxz.com/2019/07/17/kafka-ru-men-jiu-zhe-yi-pian/ 作者:我没有三颗心脏。
# 一、Kafka 简介
@ -79,7 +81,237 @@ Kafka 的一个关键性质是日志保留retention我们可以配置
![主题Topic与分区Partition](./../../../media/pictures/kafaka/kafka存在文件系统上.png)
任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。
每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition中。
## 讨论二Kafka 中的底层存储设计
假设我们现在 Kafka 集群只有一个 Broker我们创建 2 个 Topic 名称分别为「topic1」和「topic2」Partition 数量分别为 1、2那么我们的根目录下就会创建如下三个文件夹
```shell
| --topic1-0
| --topic2-0
| --topic2-1
```
在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 **Segment File**Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 “.index” 和 “.log” 分表表示 Segment 索引文件和数据文件。
现在假设我们设置每个 Segment 大小为 500 MB并启动生产者向 topic1 中写入大量数据topic1-0 文件夹中就会产生类似如下的一些文件:
```shell
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index | --00000000000001105814.log
| --topic2-0
| --topic2-1
```
**Segment 是 Kafka 文件存储的最小单位。**Segment 文件命名规则Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小19 位数字字符长度没有数字用0填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:
![索引文件和数据文件](./../../../media/pictures/kafaka/segment是kafka文件存储的最小单位.png)
其中以索引文件中元数据 `<3, 497>` 为例,依次在数据文件中表示第 3 个 message在全局 Partition 表示第 368769 + 3 = 368772 个 message以及该消息的物理偏移地址为 497。
注意该 index 文件并不是从0开始也不是每次递增1的这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,使得能够把 index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。
因为其文件名为上一个 Segment 最后一条消息的 offset ,所以当需要查找一个指定 offset 的 message 时,通过在所有 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message 。
由于消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment 文件),这种顺序磁盘 IO 存储设计师 Kafka 高性能很重要的原因。
> Kafka 是如何准确的知道 message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了以下三个属性:
>
> - offset表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 message可以简单的认为是一个 id
> - MessageSize表示 message 内容 data 的大小;
> - datamessage 的具体内容
## 讨论三:生产者设计概要
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka另一个服务来读取消息并根据规则引擎来检查交易是否通过将结果通过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。
不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:
![生产者设计概要](./../../../media/pictures/kafaka/生产者设计概要.png)
图片来源:[http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/12/kafka-producer.html](http://www.dengshenyu.com/分布式系统/2017/11/12/kafka-producer.html)
流程如下:
1. 首先我们需要创建一个ProducerRecord这个对象需要包含消息的主题topic和值value可以选择性指定一个键值key或者分区partition
2. 发送消息时生产者会对键值和值序列化成字节数组然后发送到分配器partitioner
3. 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
4. 选择完分区后生产者知道了消息所属的主题和分区它将这条记录添加到相同主题和分区的批量消息中另一个线程负责发送这些批量消息到对应的Kafka broker。
5. 当broker接收到消息后如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象否则返回异常。
6. 生产者接收到结果后,对于异常可能会进行重试。
## 讨论四:消费者设计概要
### 消费者与消费组
假设这么个场景我们从Kafka中读取消息并且进行检查最后产生结果数据。我们可以创建一个消费者实例去做这件事情但如果生产者写入消息的速度比消费者读取的速度快怎么办呢这样随着时间增长消息堆积越来越严重。对于这种场景我们需要增加多个消费者来进行水平扩展。
Kafka消费者是**消费组**的一部分当多个消费者形成一个消费组来消费主题时每个消费者会收到不同分区的消息。假设有一个T1主题该主题有4个分区同时我们有一个消费组G1这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息如下所示
![生产者设计概要](./../../../media/pictures/kafaka/消费者设计概要1.png)
如果我们增加新的消费者C2到消费组G1那么每个消费者将会分别收到两个分区的消息如下所示
![生产者设计概要](./../../../media/pictures/kafaka/消费者设计概要2.png)
如果增加到4个消费者那么每个消费者将会分别收到一个分区的消息如下所示
![生产者设计概要](./../../../media/pictures/kafaka/消费者设计概要3.png)
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
![生产者设计概要](./../../../media/pictures/kafaka/消费者设计概要4.png)
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。
**Kafka一个很重要的特性就是只需写入一次消息可以支持任意多的应用读取这个消息。**换句话说每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息应用需要有不同的消费组。对于上面的例子假如我们新增了一个新的消费组G2而这个消费组有两个消费者那么会是这样的
![生产者设计概要](./../../../media/pictures/kafaka/消费者设计概要5.png)
在这个场景中消费组G1和消费组G2都能收到T1主题的全量消息在逻辑意义上来说它们属于不同的应用。
最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
### 消费组与分区重平衡
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为**重平衡rebalance**。重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。**不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。**而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。
消费者通过定期发送心跳hearbeat到一个作为组协调者group coordinator的 broker 来保持在消费组内存活。这个 broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。
如果消费者超过一定时间没有发送心跳那么它的会话session就会过期组协调者会认为该消费者已经宕机然后触发重平衡。可以看到从消费者宕机到会话过期是有一定时间的这段时间内该消费者的分区都不能进行消息消费通常情况下我们可以进行优雅关闭这样消费者会发送离开的消息到组协调者这样组协调者可以立即进行重平衡而不需要等待会话过期。
在 0.10.1 版本Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活这个配置可以避免活锁livelock。活锁是指应用没有故障但是由于某些原因不能进一步消费。
### Partition 与消费模型
上面提到Kafka 中一个 topic 中的消息是被打散分配在多个 Partition(分区) 中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?
答案是没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
下一个问题是Partition 中的消息可以被(不同的 Consumer Group多次消费那 Partition中被消费的消息是何时删除的 Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?
无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费2 天后,消息自动被删除。
Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。 如下图:
![生产者设计概要](./../../../media/pictures/kafaka/Partition与消费模型.png)
### 为什么 Kafka 是 pull 模型
消费者应该向 Broker 要数据pull还是 Broker 向消费者推送数据push作为一个消息系统Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system比如 Facebook 的[Scribe](https://github.com/facebookarchive/scribe)和 Cloudera 的[Flume](https://flume.apache.org/),采用 push 模式。事实上push 模式和 pull 模式各有优劣。
**push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。**push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。**而 pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。**
**对于 Kafka 而言pull 模式更合适。**pull 模式可简化 broker 的设计Consumer 可自主控制消费消息的速率,同时 Consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
## 讨论五Kafka 如何保证可靠性
当我们讨论**可靠性**的时候,我们总会提到*保证**这个词语。可靠性保证是基础我们基于这些基础之上构建我们的应用。比如关系型数据库的可靠性保证是ACID也就是原子性Atomicity、一致性Consistency、隔离性Isolation和持久性Durability
Kafka 中的可靠性保证有如下四点:
- 对于一个分区来说它的消息是有序的。如果一个生产者向一个分区先写入消息A然后写入消息B那么消费者会先读取消息A再读取消息B。
- 当消息写入所有in-sync状态的副本后消息才会认为**已提交committed**。这里的写入有可能只是写入到文件系统的缓存不一定刷新到磁盘。生产者可以等待不同时机的确认比如等待分区主副本写入即返回后者等待所有in-sync状态副本写入才返回。
- 一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
- 消费者只能读取到已提交的消息。
使用这些基础保证,我们构建一个可靠的系统,这时候需要考虑一个问题:究竟我们的应用需要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。因此,我们往往需要做权衡,一味的追求可靠性并不实际。
> 想了解更多戳这里http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/21/kafka-data-delivery.html
三、动手搭一个 Kafka
通过上面的描述我们已经大致了解到了「Kafka」是何方神圣了现在我们开始尝试自己动手本地搭一个来实际体验一把。
## 第一步:下载 Kafka
这里以 Mac OS 为例,在安装了 Homebrew 的情况下执行下列代码:
```shell
brew install kafka
```
由于 Kafka 依赖了 Zookeeper所以在下载的时候会自动下载。
## 第二步:启动服务
我们在启动之前首先需要修改 Kafka 的监听地址和端口为 `localhost:9092`
```shell
vi /usr/local/etc/kafka/server.properties
```
然后修改成下图的样子:
![启动服务](./../../../media/pictures/kafaka/启动服务.png)
依次启动 Zookeeper 和 Kafka
```shell
brew services start zookeeper
brew services start kafka
```
然后执行下列语句来创建一个名字为 “test” 的 Topic
```shell
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
```
我们可以通过下列的命令查看我们的 Topic 列表:
```shell
kafka-topics --list --zookeeper localhost:2181
```
## 第三步:发送消息
然后我们新建一个控制台,运行下列命令创建一个消费者关注刚才创建的 Topic
```shell
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
```
用控制台往刚才创建的 Topic 中添加消息,并观察刚才创建的消费者窗口:
```shel
kafka-console-producer --broker-list localhost:9092 --topic test
```
能通过消费者窗口观察到正确的消息:
![发送消息](./../../../media/pictures/kafaka/发送消息.png)
# 参考资料
------
1. https://www.infoq.cn/article/kafka-analysis-part-1 - Kafka 设计解析Kafka 背景及架构介绍
2. [http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/06/kafka-Meet-Kafka.html](http://www.dengshenyu.com/分布式系统/2017/11/06/kafka-Meet-Kafka.html) - Kafka系列初识Kafka
3. https://lotabout.me/2018/kafka-introduction/ - Kafka 入门介绍
4. https://www.zhihu.com/question/28925721 - Kafka 中的 Topic 为什么要进行分区? - 知乎
5. https://blog.joway.io/posts/kafka-design-practice/ - Kafka 的设计与实践思考
6. [http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/21/kafka-data-delivery.html](http://www.dengshenyu.com/分布式系统/2017/11/21/kafka-data-delivery.html) - Kafka系列可靠的数据传输

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 496 KiB