Making Sense Of Stream Processing 以流式的思维,重新审视了当前互联网产品的技术架 构。如今面临的问题,如缓存一致性、读写性能的横向扩展、服务之间依赖的控制,在当前 的框架下,是可以有效解决的吗?如果我们来重新设计,又可以采用什么样的架构来更好的 解决这些问题呢?请看重新理解流式处理。

流处理与业务

Stream Processing有很多来源不同的近义词,Complex Event Processing,Event Sourcing等等。CEP来自于模拟软件。Event Sourcing来自于DDD,应对企业应用开发中相对 互联网公司较小的数据量级和甚为复杂的数据模型的场景。

Google Analysis是一个很好的Stream Processing的例子。Google Analysis会记录用户的 行为,以Event的形式发送到服务器。流处理有两种方法处理这些Event:

    1. 以Raw Event的形式存储所有的Event。查询的时候,扫描所有的event,得出结果。
    1. 不存储Event,而存储将Event聚合得出的结果。

以a方式存储,有最大的分析灵活性。而b方式更适合需要实时的情景。虽然看起来A方法花 费众多,Google其实是还是会存储所有的信息。

而Event Sourcing更多关注的是应该如何安排数据。以购物车场景为例,当用户更改购物车 内的某个商品的数量时,传统的方法是update之前的数据。DDD认为,不应该记录状态,而 应该将状态的变更作为不可变事件存储下来,可以保留更多的历史信息。

这两种形式本质上都是两个场景:

  • 以Raw Event的形式写。
  • 以Aggregate Summaries的方式读。

写Raw Event是最简单的,只需要不断的追加日志。读Aggregate Summaries是符合直觉的, 用户希望能够看到最终的状态,而不是全部历史。因此分离读和写是有意义的,这种理念也 被称作CQRS(command-query responsibility segregation)。

以几个场景为例:

  • Twitter。用户发送一条微博很简单。但是想找出一个用户关注的所有用户的最近的100条 tweet,SQL解决这个问题耗费将会巨大,且很难扩展。需要方法能够在用户查看自己的主 页之前计算好用户能看到的tweet。
  • Facebook。用户点击Like按钮简单,呈现的结果(动态,多少人Like了这条动态,包括哪 些人,多少人转发,评论?)非常复杂。和Twitter有同样地问题。
  • Wikipedia。不同于以上,Wikipedia中用户提交的内容和看到的内容几乎是一样的,是否 可以只提交diff优化性能?Google Doc是这样做的,粒度在一个字符。
  • LinkedIn。当用户更新了自己的简历时,如何保证能更新用户的索引呢?需要机制来更新 复杂的索引。

Event Sourcing来解决这些问题。将事件以事件流的形式保存下来,我们就获得了真理之源 (Source of Truth),通过对事件流不同的消费方式,我们可以:

  • 将事件流存储到Hadoop来做离线分析。
  • 更新全文索引。
  • 更新缓存。
  • 甚至将一个流的输出接入到另一个流的输入中。

这样的好处是什么呢?

  • 解耦合。读和写的Schema不必再一样,加入中间层之后,系统的各部分的耦合度大大降低。
  • 读写性能。读和写Schema如果一样时,通常读写性能只能优化其中一个。读写分离之后, 读和写都可以很快。
  • 扩展性。事件流抽象模型简单,利于并行和扩展。读写分离之后,系统可以显式的分为 Producer和Consumer,可以更好地利用硬件的并发性能。
  • 灵活性和敏捷性。不再需要Schema migration。通过加入新的Consumer,也可以快速的实 验新的接口,同时不会影响旧接口。
  • 利于解决错误场景。有Bug时,重新处理一遍Event就可以获得正确的数据。

我们已经有了这样的工具:

  • Kafka
  • Flink
  • CEP。在Event Stream中不断寻找满足设定pattern的Event,并通知用户(complex event), 适用于反作弊等等。
  • Stream SQL
  • Actor framework
  • Reactive
  • Change Data Capture (CDC)

日志作为应用的核心存储

一切应用都在无可避免的走向复杂。以Web应用为例,一个最简单的Web应用最开始只是读写 SQL数据库,用户增多后,可能会加入缓存来增快访问速度。用户需要查询某些信息,又加 入了ES等索引引擎。应用需要给用户异步的发送通知,用到了消息队列。再之后,Hadoop等 分析工具也有了需求。一个Web应用在发展壮大的过程中无可避免的走向了复杂。

越来越多的数据组件,将数据复制了若干份。这个过程中,核心的问题是:如何保证各个组 件中的数据是一致的。

最常见的方法是双写(dual write)。如果我们要更新数据库,那么对应的缓存中的数据也应 该能够正确的更新。双写即前后更新两个数据组件:写入数据库,然后更新缓存。

双写的好处是简单直观,所以被广泛应用。不幸的是,双写有两个很严重的问题:

  • 易出现时序问题(race condition)。例如一段逻辑中要先后写A,B两个存储,同时有x,y 两个请求,可能会出现(xA, yA, yB,xB)类似的时序,导致最终的结果是yA,xB,A和B存 储是最终不一致的。
  • partial failure无法保证原子性。例如写A和B,当A写成功,B写失败时,同样会造成数 据不一致的情况。这种情况虽然可以通过Transaction避免,但是对于没有实现 Transaction的NoSQL数据库是无能为力的。此外,当A和B是不同类型的存储时,同样很难 搞定。

解决这个问题的最傻瓜方法:日志。日志是极为简单,又极为强大的工具。日志的应用有:

  • 关系型数据库中使用WAL保证数据的持久性,保证索引和存储一致。
  • Replication使用Log,来广播Master中的数据。
  • 一致性协议中,达成一致的是Log。
  • 当然,Kafka的存储原理也是Log。

Log有一些很棒的性质。首先,Log的写是Append only的,你只能在Log的结尾追加数据,可 以达到很高的写吞吐。顺序读同样极快。其次,Log可以维护顺序。Kafka相比AMQP的重要属 性是,Kafka的单partition消费是严格顺序的,每一个Consumer Group只维护了当前消费到 的Offset,而AMQP则对每条消息维护了ACK。当然,对于不需要顺序的任务来说(例如,发 送通知), AMQP有自己的优势。

回到之前的问题:Log如何保证各个组件中的数据是一致的?避免双写是必须的,双写会带 来永久性的不一致。我们只需要所有的写操作都只写入日志,每一个数据源拥有自己独立的 Consumer来消费Log,写入自己的存储。

前面提到的时序问题,由于Log天然的顺序性,每一个Consumer Group均按照同样地顺序来 消费日志,最终会到达同样的状态。

第二个问题,当某一个组件临时性的故障时,写入会自然暂停。当组件恢复之后,又可以从 之前的位点继续开始消费,同样可以保证应用的各个组件之前状态是一致的。

写Log意味着,系统中所有的写操作都将是异步的,可能会造成一些问题。举例来说,用户 系统希望每一个用户名都是独一无二的。在异步的场景下,我们怎么解决这个问题?

一种方法是下文将要提到的捕获数据变更(Changed Data Capture,下称CDC)。

还有一种更为简单的方法。当直接写数据库时,我们会使用transaction和unique constriant来解决并发写问题。当不满足条件时,transaction会回滚。使用Log有类似的机 制。当有多个用户同时注册时,我们可以为每一个用户写入一条Claim Username的消息。这 个消息不保证用户名一定会落库,只是用来确定顺序。消费者收到Claim之后,就可以先检 查数据库,如果用户名已经存在,就发出一条Registration Failed的消息,不存在则写入 数据库,发出Registration Success的消息。只要保证同样的用户名必定会落在同一个 Partition上(可以将Username作为Partition Key),甚至不需要使用transaction,因为 不存在并发的问题。将Partition的数量提高,可以进一步提高处理的速度。

那么同步的写请求如何知道是否用户名可用呢?写请求可以同步的监听Registraion Success的消息。通常这些消息若干毫秒即可返回。如果冲突足够少,甚至可以先告诉用户 注册成功。如果失败了,则为用户分配一个临时的用户名,并提醒用户修改名称。

试想一下一个只存在GET接口的系统,所有的写都将通过Log。我们怎么样设计这样一个系统 呢?

捕获数据变更

将整个应用迁移到日志流上是非常激进的变化。通过CDC,我们可以改造现有的使用数据库 的应用,并且享受到日志流的好处。

CDC(Changed Data Capture)的理念是,将数据库的变更以日志的形式体现,提供给外部的 组件使用。当前的数据库已经提供了Trigger的机制,当数据出现变更时,就会触发用户定 义的行为。但是这种Trigger维护在数据库内,不易使用,也对性能不利。

我们可以把表看成一条条Log状态更新的快照。之前提到,关系型数据库同样会使用Log来保 证写入的持久性,来进行主从同步,如果把我们的其他数据组件也当作一个从库,读取这些 Log来更新自己的数据呢?不幸的是,大部分数据库都将这些Log作为内部数据结构使用,不 对外开放。一些公司选择开发自己的数据库来利用CDC,如LinkedIn的Databus,Facebook的 Wormhole。

目前,大多数数据库都会提供导出SNAPSHOT的能力,并且支持热导出,即导出某一份快照时, 同时允许新的写请求。PostgreSQL某一版后提供了Logical Decoding的功能,可以将WAL解 析成行的变更。

本书作者利用这个功能,开发了Bottled Water库。Bottled Water需要侵入PostgreSQL内。 初始化时,Bottled Water会获取数据表的当前SNAPSHOT,将每行各转为一条Kafka消息,写 入Kafka内。当表改变时,BW会解析WAL的变更,将变更行的内容同样转换成一个事件。这样, 从头到尾,读Kafak内所有的事件,就可以还原出表的完整状态。其他的数据组件就可以监 听这些事件,来同步数据库。

Kafka天然适合当作CDC的管道。Kafka有Log Compaction的选项。当开启后,Kafak就会关闭 按照时间的垃圾回收,转而定时的合并相同Key的数据。Bottled Water将每行的Primary Key作为事件的Key。创建、更新行时都会发一个新的同样Key的事件,删除时会发一个同样 Key的值为null的新事件。每次合并时只保留最新的Key。

Bottled Water采用Avro作为序列化的方式。相比于json、protobuf、thrift,Arvo在保持 体积足够小的同时,更适宜schema的变更。

PostgreSQL只有当Transaction结束时才会暴露WAL,所以Bottled Water不需要处理中止 Transaction的情况。

分布式数据的Unix哲学

不妨回顾一下Unix哲学。Unix哲学,一为“只做一件事、做好一件事”,一为 “他人与我互为输入输出”。各类工具相互配合,可有千般变化,只因接口之统一。文件为最 通用之抽象,上层程序,虽形态各异,也须遵循此道。

DBMS另辟蹊径。SQL层隐藏各类细节,函数方法不依赖于他人,亦不向他人提供,譬如排序、 去重等等,必体现于SQL之中,执行在引擎之内。此所谓单体软件(Monolithic)。由彼之流行, 亦可谓可行之道也。

然时代更替,需求渐长,关系型数据库难再独挑存储大梁。如缓存,全文索引等,各自存储均 须与其保持最终一致性。如前文所述,Log为解决问题之最佳工具,怎奈被雪藏于存储引擎 之内,单体软件弊端亦显。

Kafka由是横空出世,承Unix哲学衣钵。Kafka与Unix程序哲学共通之处:统一接口。日志乃 多组件之间共通之接口。无此接口,组件之间相互依赖,关系庞杂。有此接口,各组件与此 接口分别制定协议,Kafka为全系统数据之根基。

Kafka有Unix Pipe不能及之处,如:

  • Kafka是分布式系统,设置得当,单一节点宕机不会影响系统可用性。
  • Kafka将日志写入磁盘,一旦确认写入,节点宕机不会影响数据完整性。Unix程序多将日 志放入内存中,易失。
  • Kafka以消息为单位,相比于Unix管道仅提供二进制流,更易操作。
  • Kafka日志,可有多个订阅方,Unix Pipe只可供一方使用。
  • Kafka有磁盘为缓存,读写可分离。Unix Pipe 一旦Buffer填满,将不得不反压。
  • Kafka中数据流有Topic定名,Unix Pipe无此功能。

但究其本质,Kafka没有超出Unix Pipe的思想范畴,可以理解为分布式版本的Unix哲学的体现。