背景介绍
kafka 是最初由 Linkedin 公司开发,使用 Scala 语言编写,Kafka 是一个分布式、分区的、多副本的、多订阅者的分布式 MQ 系统,可以用于 web/nginx 日志,搜索日志,监控日志,访问日志等等。
kafka 目前支持多种客户端语言:java,python,c++,php 等等。
总体结构
kafka 名词解释和工作方式
- Producer :消息生产者,就是向 kafka broker 发消息的客户端。
- Consumer :消息消费者,向 kafka broker 取消息的客户端
- Topic :咋们可以理解为一个队列。
- Consumer Group (CG):这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 CG。topic 的消息会复制(不是真的复制,是概念上的)到所有的 CG,但每个 CG 只会把消息发给该 CG 中的一个 consumer。如果需要实现广播,只要每个 consumer 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG 还可以将 consumer 进行自由的分组而不需要多次发送消息到不同的 topic。
- Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
- Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 - partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition 中的顺序将消息发给 consumer,不保证一个 topic 的整体(多个 partition 间)的顺序。
- Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka
kafka 特性
- 通过 O (1) 的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 的消息存储也能够保持长时间的稳定性能。
- 高吞吐量:即使是非常普通的硬件 kafka 也可以支持每秒数十万的消息。
- 支持同步和异步复制两种 HA
- Consumer 客户端 pull,随机读,利用 sendfile 系统调用,zero-copy , 批量拉数据
- 消费状态保存在客户端
- 消息存储顺序写
- 数据迁移、扩容对用户透明
- 支持 Hadoop 并行数据加载。
- 支持 online 和 offline 的场景。
- 持久化:通过将数据持久化到硬盘以及 replication 防止数据丢失。
- scale out:无需停机即可扩展机器。
- 定期删除机制,支持设定 partitions 的 segment file 保留时间。
可靠性(一致性)
kafka (MQ) 要实现从 producer 到 consumer 之间的可靠的消息传送和分发。传统的 MQ 系统通常都是通过 broker 和 consumer 间的确认(ack)机制实现的,并在 broker 保存消息分发的状态。
即使这样一致性也是很难保证的。kafka 的做法是由 consumer 自己保存状态,也不要任何确认。这样虽然 consumer 负担更重,但其实更灵活了。
因为不管 consumer 上任何原因导致需要重新处理消息,都可以再次从 broker 获得。
kafak 系统扩展性
kafka 使用 zookeeper 来实现动态的集群扩展,不需要更改客户端(producer 和 consumer)的配置。broker 会在 zookeeper 注册并保持相关的元数据(topic,partition 信息等)更新。
而客户端会在 zookeeper 上注册相关的 watcher。一旦 zookeeper 发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除 broker 时,各 broker 间仍能自动实现负载均衡。
kafka 设计目标
高吞吐量是其核心设计之一。
- 数据磁盘持久化:消息不在内存中 cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
- zero-copy:减少 IO 操作步骤。
- 支持数据批量发送和拉取。
- 支持数据压缩。
- Topic 划分为多个 partition,提高并行处理能力。
Producer 负载均衡和 HA 机制
- producer 根据用户指定的算法,将消息发送到指定的 partition。
- 存在多个 partiiton,每个 partition 有自己的 replica,每个 replica 分布在不同的 Broker 节点上。
- 多个 partition 需要选取出 lead partition,lead partition 负责读写,并由 zookeeper 负责 fail over。
- 通过 zookeeper 管理 broker 与 consumer 的动态加入与离开。
Consumer 的 pull 机制
由于 kafka broker 会持久化数据,broker 没有 cahce 压力,因此,consumer 比较适合采取 pull 的方式消费数据,具体特别如下:
- 简化 kafka 设计,降低了难度。
- consumer 根据消费能力自主控制消息拉取速度。
- consumer 根据自身情况自主选择消费模式,例如批量,重复消费,从指定 partition 或位置 (offset) 开始消费等.
Consumer 与 topic 关系以及机制
kafka 只支持 Topic. 每个 consumer 属于一个 consumer group,每个 group 中可以有多个 consumer.
Topic 中的一条特定的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费,此消息不会发送给一个 group 的多个 consumer; 那么一个 group 中所有的 consumer 将会交错的消费整个 Topic.
如果所有的 consumer 都具有相同的 group, 这种情况和 JMS queue 模式很像;
消息将会在 consumers 之间负载均衡.如果所有的 consumer 都具有不同的 group, 那这就是 “发布 - 订阅”; 消息将会广播给所有的消费者
每个 group 中 consumer 消息消费互相独立(我们可以认为一个 group 是一个 “订阅” 者,)
一个 group 中会包含多个 consumer, 这样不仅可以提高 topic 中消息的并发消费能力,而且还能提高 “故障容错” 性,如果 group 中的某个 consumer 失效,
那么其消费的 partitions 将会有其他 consumer 自动接管.kafka 的设计原理决定,对于一个 topic, 同一个 group 中不能有多于 partitions 个数的 consumer 同时消费,
否则将意味着某些 consumer 将无法得到消息
Producer 均衡算法
kafka 集群中的任何一个 broker, 都可以向 producer 提供 metadata 信息,这些 metadata 中包含 “集群中存活的 servers 列表”/“partitions leader 列表”等信息 (请参看 zookeeper 中的节点信息).
当 producer 获取到 metadata 信心之后,producer 将会和 Topic 下所有 partition leader 保持 socket 连接;
消息由 producer 直接通过 socket 发送到 broker, 中间不会经过任何 “路由层”. 事实上,消息被路由到哪个 partition 上,有 producer 客户端决定.比如可以采用 “random””key-hash””轮询” 等,如果一个 topic 中有多个 partitions, 那么在 producer 端实现 “消息均衡分发” 是必要的.
在 producer 端的配置文件中,开发者可以指定 partition 路由的方式.
consumer均衡算法
当一个 group 中,有 consumer 加入或者离开时,会触发 partitions 均衡。均衡的最终目的,是提升 topic 的并发消费能力.
- 假如 topic1, 具有如下 partitions: P0,P1,P2,P3
- 加入 group 中,有如下 consumer: C0,C1
- 首先根据 partition 索引号对 partitions 排序: P0,P1,P2,P3
- 根据 consumer.id 排序: C0,C1
- 计算倍数: M = [P0,P1,P2,P3].size/ [C0,C1].size, 本例值 M=2 (向上取整)
- 然后依次分配 partitions: C0 = [P0,P1],C1=[P2,P3], 即 Ci = [P (i * M),P ((i + 1) * M -1)]
kafka broker 集群内 broker 之间 replica 机制
kafka 中,replication 策略是基于 partition, 而不是 topic;
kafka 将每个 partition 数据复制到多个 server 上,任何一个 partition 有一个 leader 和多个 follower (可以没有);
备份的个数可以通过 broker 配置文件来设定.leader 处理所有的 read-write 请求,follower 需要和 leader 保持同步.Follower 就像一个 “consumer”,
消费消息并保存在本地日志中;leader 负责跟踪所有的 follower 状态,如果 follower”落后” 太多或者失效,leader 将会把它从 replicas 同步列表中删除.
当所有的 follower 都将一条消息保存成功,此消息才被认为是 “committed”, 那么此时 consumer 才能消费它,这种同步策略,就要求 follower 和 leader 之间必须具有良好的网络环境.
即使只有一个 replicas 实例存活,仍然可以保证消息的正常发送和接收,只要 zookeeper 集群存活即可.(备注:不同于其他分布式存储,比如 hbase 需要 “多数派” 存活才行)
kafka 判定一个 follower 存活与否的条件有 2 个
- follower 需要和 zookeeper 保持良好的链接
- 它必须能够及时的跟进 leader, 不能落后太多
如果一个 follower 失效 (server 失效) 或者落后太多,leader 将会把它从同步列表中移除 [备注:如果此 replicas 落后太多,它将会继续从 leader 中 fetch 数据,直到足够 up-to-date然后再次加入到同步列表中;]
kafka 不会更换 replicas 宿主!因为 “同步列表” 中 replicas 需要足够快,这样才能保证 producer 发布消息时接受到 ACK 的延迟较小。
总结
Producer 端直接连接 broker.list 列表,从列表中返回 TopicMetadataResponse, 该 Metadata 包含 Topic 下每个 partition leader 建立 socket 连接并发送消息.
Broker 端使用 zookeeper 用来注册 broker 信息,以及监控 partition leader 存活性.
Consumer 端使用 zookeeper 用来注册 consumer 信息,其中包括 consumer 消费的 partition 列表等,同时也用来发现 broker 列表,并和 partition leader 建立 socket 连接,并获取消息.
性能测试
见: 压力测试