数据存储

Kafka 速度的秘诀在于,它把所有的消息都变成一个的文件。通过 mmap 提高 IO 速度,写入数据的时候它是末尾添加所以速度最优;读取数据的时候配合 sendfile 直接暴力输出。阿里的 RocketMQ 也是这种模式,只不过是用 Java 写的。

Terminology

  • Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker

  • Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic。(物理上不同 topic 的消息分开存储,逻辑上一个 topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 topic 即可生产或消费数据而不必关心数据存于何处)

  • Partition:parition 是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时可指定 parition 数量。每个 partition 对应于一个文件夹,该文件夹下存储该 partition 的数据和索引文件

  • Producer:负责发布消息到 Kafka broker

  • Consumer:消费消息。每个 consumer 属于一个特定的 consumer group(可为每个 consumer 指定 group name,若不指定 group name 则属于默认的 group)。使用 consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group 内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。

Producer

Producer 负责向 Kafka 中提交数据,Cluster 会将所有收到的消息写入到硬盘中而绝不会丢失数据。在这其中,Kafka 为了优化写入速度采用了顺序写入与 MMFile。

顺序写入

因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机 IO,最喜欢顺序 IO。为了提高读写硬盘的速度,Kafka 就是使用顺序 IO。上图就展示了 Kafka 是如何写入数据的,每一个 Partition 其实都是一个文件,收到消息后 Kafka 会把数据插入到文件末尾(虚框部分)。这种方法有一个缺陷——没有办法删除数据,所以 Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个 Topic 都有一个 offset 用来表示读取到了第几条数据。 上图中有两个消费者,Consumer1 有两个 offset 分别对应 Partition0、Partition1(假设每一个 Topic 一个 Partition);Consumer2 有一个 offset 对应 Partition2。这个 offset 是由客户端 SDK 负责保存的,Kafka 的 Broker 完全无视这个东西的存在;一般情况下 SDK 会把它保存到 zookeeper 里面。(所以需要给 Consumer 提供 zookeeper 的地 址)。 如果不删除硬盘肯定会被撑满,所以 Kakfa 提供了两种策略来删除数据。一是基于时间,二是基于 partition 文件大小。具体配置可以参看它的配置文档。

Memory Mapped Files

即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以 Kafka 的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高 IO 效率。 Memory Mapped Files(后面简称 mmap)也被翻译成内存映射文件,在 64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。 通过 mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。使用这种方式可以获取很大的 IO 提升,省去了用户空间到内核空间复制的开销(调用文件的 read 会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘。Kafka 提供了一个参数——producer.type 来控制是不是主动 flush,如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步(sync);写入 mmap 之后立即返回 Producer 不调用 flush 叫异步(async)。mmap 其实是 Linux 中的一个函数就是用来实现内存映射的,谢谢 Java NIO,它给我提供了一个 mappedbytebuffer 类可以用来实现内存映射(所以是沾了 Java 的光才可以如此神速和 Scala 没关系!!)

Consumer With Zero Copy

ZeroMQ 完全没有任何服务器节点,也不会使用硬盘,按照道理说它应该比 Kafka 快。可是实际测试下来它的速度还是被 Kafka“吊打”。“一个用硬盘的比用内存的快”,这绝对违反常识;如果这种事情发生说明——它作弊了。 没错,Kafka“作弊”。无论是顺序写入还是 mmap 其实都是作弊的准备工作。仔细想一下,一个 Web Server 传送一个静态文件,如何优化?答案是 zero copy。传统模式下我们从硬盘读取一个文件是这样的

img 先复制到内核空间(read 是系统调用,放到了 DMA,所以用内核空间),然后复制到用户空间(1,2);从用户空间重新复制到内核空间(你用的 socket 是系统调用,所以它也有自己的内核空间),最后发送给网卡(3、4)。

img Zero Copy 中直接从内核空间(DMA 的)到内核空间(Socket 的),然后发送网卡。 这个技术非常普遍,The C10K problem 里面也有很详细的介绍,Nginx 也是用的这种技术,稍微搜一下就能找到很多资料。

Java 的 NIO 提供了 FileChannle,它的 transferTo、transferFrom 方法就是 Zero Copy。 Kafka 把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候 Kafka 直接把“文件”发送给消费者。这就是秘诀所在,比如:10W 的消息组合在一起是 10MB 的数据量,然后 Kafka 用类似于发文件的方式直接扔出去了,如果消费者和生产者之间的网络非常好(只要网络稍微正常一点 10MB 根本不是事。。。家里上网都是 100Mbps 的带宽了),10MB 可能只需要 1s。所以答案是——10W 的 TPS,Kafka 每秒钟处理了 10W 条消息。 可能你说:不可能把整个文件发出去吧?里面还有一些不需要的消息呢?是的,Kafka 作为一个“高级作弊分子”自然要把作弊做的有逼格。Zero Copy 对应的是 sendfile 这个函数(以 Linux 为例),这个函数接受

out_fd作为输出(一般及时socket的句柄)
in_fd作为输入文件句柄
off_t表示in_fd的偏移(从哪里开始读取)
size_t表示读取多少个

没错,Kafka 是用 mmap 作为文件读写方式的,它就是一个文件句柄,所以直接把它传给 sendfile;偏移也好解决,用户会自己保持这个 offset,每次请求都会发送这个 offset。(还记得吗?放在 zookeeper 中的);数据量更容易解决了,如果消费者想要更快,就全部扔给消费者。如果这样做一般情况下消费者肯定直接就被压死了;所以 Kafka 提供了的两种方式——Push,我全部扔给你了,你死了不管我的事情;Pull,好吧你告诉我你需要多少个,我给你多少个。