流计算框架对比

几个月之前我们在这里讨论过目前对于这种日渐增加的分布式流计算的需求的原因。当然,目前也有很多的各式各样的框架被用于处理这一些问题。现在我们会在这篇文章中进行回顾,来讨论下各种框架之间的相似点以及区别在哪里,还有就是从我的角度分析的,推荐的适用的用户场景。 如你所想,

DAG 主要功能即是用图来表示链式的任务组合,而在流处理系统中,我们便常常用 DAG 来描述一个流工作的拓扑。笔者自己是从 Akka 的 Stream 中的术语得到了启发。如下图所示,数据流经过一系列的处理器从源点流动到了终点,也就是用来描述这流工作。谈到 Akka 的 Streams,我觉得要着重强调下分布式这个概念,因为即使也有一些单机的解决方案可以创建并且运行 DAG,但是我们仍然着眼于那些可以运行在多机上的解决方案。

Points of Interest

在不同的系统之间进行选择的时候,我们主要关注到以下几点。

  • Runtime and Programming model(运行与编程模型)

    一个平台提供的编程模型往往会决定很多它的特性,并且这个编程模型应该足够处理所有可能的用户案例。这是一个决定性的因素,我也会在下文中多次讨论。

  • Functional Primitives(函数式单元)

    一个合格的处理平台应该能够提供丰富的能够在独立信息级别进行处理的函数,像 map、filter 这样易于实现与扩展的一些函数。同样也应提供像 aggregation 这样的跨信息处理函数以及像 join 这样的跨流进行操作的函数,虽然这样的操作会难以扩展。

  • State Management(状态管理)

    大部分这些应用都有状态性的逻辑处理过程,因此,框架本身应该允许开发者去维护、访问以及更新这些状态信息。

  • Message Delivery Guarantees(消息投递的可达性保证)

    一般来说,对于消息投递而言,我们有至多一次(at most once)、至少一次(at least once)以及恰好一次(exactly once)这三种方案。 - at most once

    At most once 投递保证每个消息会被投递 0 次或者 1 次,在这种机制下消息很有可能会丢失。 - at least once

    At least once 投递保证了每个消息会被默认投递多次,至少保证有一次被成功接收,信息可能有重复,但是不会丢失。 - exactly once

    exactly once 意味着每个消息对于接收者而言正好被接收一次,保证即不会丢失也不会重复。

  • Failures Handling

    在一个流处理系统中,错误可能经常在不同的层级发生,譬如网络分割、磁盘错误或者某个节点莫名其妙挂掉了。平台要能够从这些故障中顺利恢复,并且能够从最后一个正常的状态继续处理而不会损害结果。

除此之外,我们也应该考虑到平台的生态系统、社区的完备程度,以及是否易于开发或者是否易于运维等等。

RunTime and Programming Model

运行环境与编程模型可能是某个系统的最重要的特性,因为它定义了整个系统的呈现特性、可能支持的操作以及未来的一些限制等等。因此,运行环境与编程模型就确定了系统的能力与适用的用户案例。目前,主要有两种不同的方法来构建流处理系统,其中一个叫 Native Streaming,意味着所有输入的记录或者事件都会根据它们进入的顺序一个接着一个的处理。

另一种方法叫做 Micro-Batching。大量短的 Batches 会从输入的记录中创建出然后经过整个系统的处理,这些 Batches 会根据预设好的时间常量进行创建,通常是每隔几秒创建一批。

两种方法都有一些内在的优势与不足,首先来谈谈 Native Streaming。好的一方面呢是 Native Streaming 的表现性会更好一点,因为它是直接处理输入的流本身的,并没有被一些不自然的抽象方法所限制住。同时,因为所有的记录都是在输入之后立马被处理,这样对于请求方而言响应的延迟就会优于那种 Micro-Batching 系统。处理这些,有状态的操作符也会更容易被实现,我们在下文中也会描述这个特点。不过 Native Streaming 系统往往吞吐量会比较低,并且因为它需要去持久化或者重放几乎每一条请求,它的容错的代价也会更高一些。并且负载均衡也是一个不可忽视的问题,举例而言,我们根据键对数据进行了分割并且想做进一步地处理。如果某些键对应的分区因为某些原因需要更多地资源去处理,那么这个分区往往就会变成整个系统的瓶颈。 而对于 Micro-Batching 而言,将流切分为小的 Batches 不可避免地会降低整个系统的变现性,也就是可读性。而一些类似于状态管理的或者 joins、splits 这些操作也会更加难以实现,因为系统必须去处理整个 Batch。另外,每个 Batch 本身也将架构属性与逻辑这两个本来不应该被糅合在一起的部分相连接了起来。而 Micro-Batching 的优势在于它的容错与负载均衡会更加易于实现,它只要简单地在某个节点上处理失败之后转发给另一个节点即可。最后,值得一提的是,我们可以在 Native Streaming 的基础上快速地构建 Micro-Batching 的系统。 而对于编程模型而言,又可以分为 Compositional(组合式)与 Declarative(声明式)。组合式会提供一系列的基础构件,类似于源读取与操作符等等,开发人员需要将这些基础构件组合在一起然后形成一个期望的拓扑结构。新的构件往往可以通过继承与实现某个接口来创建。另一方面,声明式 API 中的操作符往往会被定义为高阶函数。声明式编程模型允许我们利用抽象类型和所有其他的精选的材料来编写函数式的代码以及优化整个拓扑图。同时,声明式 API 也提供了一些开箱即用的高等级的类似于窗口管理、状态管理这样的操作符。下文中我们也会提供一些代码示例。

Apache Streaming Landscape

目前已经有了各种各样的流处理框架,自然也无法在本文中全部攘括。所以我必须将讨论限定在某些范围内,本文中是选择了所有 Apache 旗下的流处理的框架进行讨论,并且这些框架都已经提供了 Scala 的语法接口。主要的话就是 Storm 以及它的一个改进 Trident Storm,还有就是当下正火的 Spark。最后还会讨论下来自 LinkedIn 的 Samza 以及比较有希望的 Apache Flink。笔者个人觉得这是一个非常不错的选择,因为虽然这些框架都是出于流处理的范畴,但是他们的实现手段千差万别。

  • Apache Storm 最初由 Nathan Marz 以及他的 BackType 的团队在 2010 年创建。后来它被 Twitter 收购并且开源出来,并且在 2014 年变成了 Apache 的顶层项目。毫无疑问,Storm 是大规模流处理中的先行者并且逐渐成为了行业标准。Storm 是一个典型的 Native Streaming 系统并且提供了大量底层的操作接口。另外,Storm 使用了 Thrift 来进行拓扑的定义,并且提供了大量其他语言的接口。

  • Trident 是一个基于 Storm 构建的上层的 Micro-Batching 系统,它简化了 Storm 的拓扑构建过程并且提供了类似于窗口、聚合以及状态管理等等没有被 Storm 原生支持的功能。另外,Storm 是实现了至多一次的投递原则,而 Trident 实现了恰巧一次的投递原则。Trident 提供了 Java, Clojure 以及 Scala 接口。

  • 众所周知,Spark 是一个非常流行的提供了类似于 SparkSQL、Mlib 这样内建的批处理框架的库,并且它也提供了 Spark Streaming 这样优秀地流处理框架。Spark 的运行环境提供了批处理功能,因此,Spark Streaming 毫无疑问是实现了 Micro-Batching 机制。输入的数据流会被接收者分割创建为 Micro-Batches,然后像其他 Spark 任务一样进行处理。Spark 提供了 Java, Python 以及 Scala 接口。

  • Samza 最早是由 LinkedIn 提出的与 Kafka 协同工作的优秀地流解决方案,Samza 已经是 LinkedIn 内部关键的基础设施之一。Samza 重负依赖于 Kafaka 的基于日志的机制,二者结合地非常好。Samza 提供了 Compositional 接口,并且也支持 Scala。

  • 最后聊聊 Flink. Flink 可谓一个非常老的项目了,最早在 2008 年就启动了,不过目前正在吸引越来越多的关注。Flink 也是一个 Native Streaming 的系统,并且提供了大量高级别的 API。Flink 也像 Spark 一样提供了批处理的功能,可以作为流处理的一个特殊案例来看。Flink 强调万物皆流,这是一个绝对的更好地抽象,毕竟确实是这样。

下表就简单列举了上述几个框架之间的特性:

Counting Words

Wordcount 就好比流处理领域的 HelloWorld,它能够很好地描述不同框架间的差异性。首先看看 Storm 是如何编写 WordCount 程序的:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new Split(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
...
Map<String, Integer> counts = new HashMap<String, Integer>();
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1;
counts.put(word, count);
collector.emit(new Values(word, count));
}

首先来看看它的拓扑定义,在第 2 行那边是定义了一个 Spout,也就是一个输入源。然后定义了一个 Bold,也就是一个处理的组件,用于将某个句子分割成词序列。然后还定义了另一个 Bolt 用来负责真实的词计算。5,8 到 12 行省略的过程用于定义集群中使用了多少个线程来供每一个组件使用。如你所见,所有的定义都是比较底层的与手动的。接下来继续看看这个 8-15 行,也就是真正用于 WordCount 的部分代码。因为 Storm 没有内建的状态处理的支持,所以我必须自定义这样一个本地状态,和理想的相差甚远啊。下面我们继续看看 Trident。 正如我上文中提及的,Trident 是一个基于 Storm 的 Micro-Batching 的扩展,它提供了状态管理等等功能。

public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"));
...
}

从代码中就可以看出,在 Trident 中就可以使用一些上层的譬如eachgroupBy这样的操作符,并且可以在 Trident 中内建的进行状态管理了。接下来我们再看看 Spark 提供的声明式的接口,要记住,与前几个例子不同的是,基于 Spark 的代码已经相当简化了,下面基本上就是要用到的全部的代码了:

val conf = new SparkConf().setAppName("wordcount")
val ssc = new StreamingContext(conf, Seconds(1))
val text = ...
val counts = text.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()

每个 Spark 的流任务都需要一个StreamingContext用来指定整个流处理的入口。StreamingContext定义了 Batch 的间隔,上面是设置到了 1 秒。在 6-8 行即是全部的词统计的计算过程,非常不一样啊。下面再看看 Apache Samza,另一个代表性的组合式的 API:

class WordCountTask extends StreamTask {
override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector,
coordinator: TaskCoordinator) {
val text = envelope.getMessage.asInstanceOf[String]
val counts = text.split(" ").foldLeft(Map.empty[String, Int]) {
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts))
}

Topology 定义在了 Samza 的属性配置文件里,为了明晰起见,这里没有列出来。下面再看看 Fink,可以看出它的接口风格非常类似于 Spark Streaming,不过我们没有设置时间间隔:

val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(...)
val counts = text.flatMap ( _.split(" ") )
.map ( (_, 1) )
.groupBy(0)
.sum(1)
counts.print()
env.execute("wordcount")

Fault Tolerance

与批处理系统相比,流处理系统中的容错机制固然的会比批处理中的要难一点。在批处理系统中,如果碰到了什么错误,只要将计算中与该部分错误关联的重新启动就好了。不过在流计算的场景下,容错处理会更加困难,因为会不断地有数据进来,并且有些任务可能需要 7*24 地运行着。另一个我们碰到的挑战就是如何保证状态的一致性,在每天结束的时候我们会开始事件重放,当然不可能所有的状态操作都会保证幂等性。下面我们就看看其他的系统是怎么处理的:

Storm

Storm 使用了所谓的逆流备份与记录确认的机制来保证消息会在某个错误之后被重新处理。记录确认这一个操作工作如下:一个操作器会在处理完成一个记录之后向它的上游发送一个确认消息。而一个拓扑的源会保存有所有其创建好的记录的备份。一旦受到了从 Sinks 发来的包含有所有记录的确认消息,就会把这些确认消息安全地删除掉。当发生错误时,如果还没有接收到全部的确认消息,就会从拓扑的源开始重放这些记录。这就确保了没有数据丢失,不过会导致重复的 Records 处理过程,这就属于 At-Least 投送原则。 Storm 用一套非常巧妙的机制来保证了只用很少的字节就能保存并且追踪确认消息,但是并没有太多关注于这套机制的性能,从而使得 Storm 有较低地吞吐量,并且在流控制上存在一些问题,譬如这种确认机制往往在存在背压的时候错误地认为发生了故障。

Spark Streaming

Spark Streaming 以及它的 Micro-Batching 机制则使用了另一套方案,道理很简单,Spark 将 Micro-Batches 分配到多个节点运行,每个 Micro-Batch 可以成功运行或者发生故障,当发生故障时,那个对应的 Micro-Batch 只要简单地重新计算即可,因为它是持久化并且无状态的,所以要保证 Exactly-Once 这种投递方式也是很简单的。

Samza

Samza 的实现手段又不一样了,它利用了一套可靠地、基于 Offset 的消息系统,在很多情况下指的就是 Kafka。Samza 会监控每个任务的偏移量,然后在接收到消息的时候修正这些偏移量。Offset 可以是存储在持久化介质中的一个检查点,然后在发生故障时可以进行恢复。不过问题在于你并不知道恢复到上一个 CheckPoint 之后到底哪个消息是处理过的,有时候会导致某些消息多次处理,这也是 At-Least 的投递原则。

Flink 主要是基于分布式快照,每个快照会保存流任务的状态。链路中运送着大量的 CheckPoint Barrier(检查点障碍,就是分隔符、标识器之类的),当这些 Barrier 到达某个 Operator 的时候,Operator 将自身的检查点与流相关联。与 Storm 相比,这种方式会更加高效,毕竟不用对每个 Record 进行确认操作。不过要注意的是,Flink 还是 Native Streaming,概念上和 Spark 还是相去甚远的。Flink 也是达成了 Exactly-Once 投递原则。

Managing State

大部分重要的流处理应用都会保有状态,与无状态的操作符相比,这些应用中需要一个输入和一个状态变量,然后进行处理最终输出一个改变了的状态。我们需要去管理、存储这些状态,要保证在发生故障的时候能够重现这些状态。状态的重造可能会比较困难,毕竟上面提到的不少框架都不能保证 Exactly-Once,有些 Record 可能被重放多次。

Storm

Storm 是实践了 At-Least 投递原则,而怎么利用 Trident 来保证 Exactly-Once 呢?概念上还是很简单的,只需要使用事务进行提交 Records,不过很明显这种方式及其低效。所以呢,还是可以构建一些小的 Batches,并且进行一些优化。Trident 是提供了一些抽象的接口来保证实现 Exactly-Once,如下图所示,还有很多东西等着你去挖掘。

Spark Streaming

当想要在流处理系统中实现有状态的操作时,我们往往想到的是一个长时间运行的 Operator,然后输入一个状态以及一系列的 Records。不过 Spark Streaming 是以另外一种方式进行处理的,Spark Streaming 将状态作为一个单独地 Micro-Batching 流进行处理,所以在对每个小的 Micro-Spark 任务进行处理时会输入一个当前的状态和一个代表当前操作的函数,最后输出一个经过处理的 Micro-Batch 以及一个更新好的状态。

Samza

Samza 的处理方式更加简单明了,就是把它们放到 Kafka 中,然后问题就解决了。Samza 提供了真正意义上的有状态的 Operators,这样每个任务都能保有状态,然后所有状态的变化都会被提交到 Kafka 中。在有需要的情况下某个状态可以很方便地从 Kafka 的 Topic 中完成重造。为了提高效率,Samza 允许使用插件化的键值本地存储来避免所有的消息全部提交到 Kafka。这种思路如下图所示,不过 Samza 只是提高了 At-Least 这种机制,未来可能会提供 Exactly-Once。

Flink 提供了类似于 Samza 的有状态的 Operator 的概念,在 Flink 中,我们可以使用两种不同的状态。第一种是本地的或者叫做任务状态,它是某个特定的 Operator 实例的当前状态,并且这种状态不会与其他进行交互。另一种呢就是维护了整个分区的状态。

Counting Words with State

Trident

public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
...
}

在第 9 行中,我们可以通过调用一个持久化的聚合函数来创建一个状态。

Spark Streaming

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])
val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val trackStateFunc = (batchTime: Time, word: String, one: Option[Int],
state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}
val stateDstream = wordDstream.trackStateByKey(
StateSpec.function(trackStateFunc).initialState(initialRDD))

在第 2 行中,我们创建了一个 RDD 用来保存初始状态。然后在 5,6 行中进行一些转换,接下来可以看出,在 8-14 行中,我们定义了具体的转换方程,即输入时一个单词、它的统计数量和它的当前状态。函数用来计算、更新状态以及返回结果,最后我们将所有的 Bits 一起聚合。

Samza

class WordCountTask extends StreamTask with InitableTask {
private var store: CountStore = _
def init(config: Config, context: TaskContext) {
this.store = context.getStore("wordcount-store")
.asInstanceOf[KeyValueStore[String, Integer]]
}
override def process(envelope: IncomingMessageEnvelope,
collector: MessageCollector, coordinator: TaskCoordinator) {
val words = envelope.getMessage.asInstanceOf[String].split(" ")
words.foreach { key =>
val count: Integer = Option(store.get(key)).getOrElse(0)
store.put(key, count + 1)
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"),
(key, count)))
}
}

在上述代码中第 3 行定义了全局的状态,这里是使用了键值存储方式,并且在 5~6 行中定义了如何初始化。然后,在整个计算过程中我们都使用了该状态。

val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(...)
val words = text.flatMap ( _.split(" ") )
words.keyBy(x => x).mapWithState {
(word, count: Option[Int]) =>
{
val newCount = count.getOrElse(0) + 1
val output = (word, newCount)
(output, Some(newCount))
}
}

在第 6 行中使用了mapWithState函数,第一个参数是即将需要处理的单次,第二个参数是一个全局的状态。

Performance

合理的性能比较也是本文的一个重要主题之一。不同的系统的解决方案差异很大,因此也是很难设置一个无偏的测试。通常而言,在一个流处理系统中,我们常说的性能就是指延迟与吞吐量。这取决于很多的变量,但是总体而言标准为如果单节点每秒能处理 500K 的 Records 就是个合格的,如果能达到 100 万次以上就已经不错了。每个节点一般就是指 24 核附带上 24 或者 48GB 的内存。 对于延迟而言,如果是 Micro-Batch 的话往往希望能在秒级别处理。如果是 Native Streaming 的话,希望能有百倍的减少,调优之后的 Storm 可以很轻易达到几十毫秒。 另一方面,消息的可达性保证、容错以及状态管理都是需要考虑进去的。譬如如果你开启了容错机制,那么会增加 10%到 15%的额外消耗。除此之外,以文章中两个 WordCount 为例,第一个是无状态的 WordCount,第二个是有状态的 WordCount,后者在 Flink 中可能会有 25%额外的消耗,而在 Spark 中可能有 50%的额外消耗。当然,我们肯定可以通过调优来减少这种损耗,并且不同的系统都提供了很多的可调优的选项。 还有就是一定要记住,在分布式环境下进行大数据传输也是一件非常昂贵的消耗,因此我们要利用好数据本地化以及整个应用的序列化的调优。

Project Maturity(项目成熟度)

在为你的应用选择一个合适的框架的时候,框架本身的成熟度与社区的完备度也是一个不可忽略的部分。Storm 是第一个正式提出的流处理框架,它已经成为了业界的标准并且被应用到了像 Twitter、Yahoo、Spotify 等等很多公司的生产环境下。Spark 则是目前最流行的 Scala 的库之一,并且 Spark 正逐步被更多的人采纳,它已经成功应用在了像 Netflix、Cisco、DataStax、Indel、IBM 等等很多公司内。而 Samza 最早由 LinkedIn 提出,并且正在运行在几十个公司内。Flink 则是一个正在开发中的项目,不过我相信它发展的会非常迅速。

Summary

在我们进最后的框架推荐之前,我们再看一下上面那张图:

Framework Recommendations

这个问题的回答呢,也很俗套,具体情况具体分析。总的来说,你首先呢要仔细评估下你应用的需求并且完全理解各个框架之间的优劣比较。同时我建议是使用一个提供了上层接口的框架,这样会更加的开发友好,并且能够更快地投入生产环境。不过别忘了,绝大部分流应用都是有状态的,因此状态管理也是不可忽略地一个部分。同时,我也是推荐那些遵循 Exactly-Once 原则的框架,这样也会让开发和维护更加简单。不过不能教条主义,毕竟还是有很多应用会需要 At-Least-Once 与 At-Most-Once 这些投递模式的。最后,一定要保证你的系统可以在故障情况下很快恢复,可以使用 Chaos Monkey 或者其他类似的工具进行测试。在我们之前的讨论中也发现这个快速恢复的能力至关重要。

  • 对于小型与需要快速响应地项目,Storm 依旧是一个非常好的选择,特别是在你非常关注延迟度的情况下。不过还是要谨记容错机制和 Trident 的状态管理会严重影响性能。Twitter 目前正在设计新的流计算系统 Heron 用来替代 Storm,它可以在单个项目中有很好地表现。不过 Twitter 可不一定会开源它。

  • 对于 Spark Streaming 而言,如果你的系统的基础架构中已经使用了 Spark,那还是很推荐你试试的。另一方面,如果你想使用 Lambda 架构,那 Spark 也是个不错的选择。不过你一定要记住,Micro-Batching 本身的限制和延迟对于你而言不是一个关键因素。

  • 如果你想用 Samza 的话,那最好 Kafka 已经是你的基础设施的一员了。虽然在 Samza 中 Kafka 只是个可插拔的组件,不过基本上所有人都会使用 Kafka。正如上文所说,Samza 提供了强大的本地存储功能,能够轻松管理数十 G 的状态数据。不过它的 At-Least-Once 的投递限制也是很大一个瓶颈。

  • Flink 目前在概念上是一个非常优秀的流处理系统,它能够满足大部分的用户场景并且提供了很多先进的功能,譬如窗口管理或者时间控制。所以当你发现你需要的功能在 Spark 当中无法很好地实现的时候,你可以考虑下 Flink。另外,Flink 也提供了很好地通用的批处理的接口,只不过你需要很大的勇气来将你的项目结合到 Flink 中,并且别忘了多关注关注它的路线图。

Dataflow 与开源

我最后一个要提到的就是 Dataflow 和它的开源计划。Dataflow 是 Google 云平台的一个组成部分,是目前在 Google 内部提供了统一的用于批处理与流计算的服务接口。譬如用于批处理的 MapReduce,用于编程模型定义的 FlumeJava 以及用于流计算的 MillWheel。Google 最近打算开源这货的 SDK 了,Spark 与 Flink 都可以成为它的一个运行驱动。

Conclusion

本文我们过了一遍常用的流计算框架,它们的特性与优劣对比,希望能对你有用吧。