本文属于 MillWheel 论文的阅读与翻译稿,部分地方使用自己的语言进行描述,部分地方进行翻译(附带原文)
讲述 MillWhell 的编程模型以及具体实现。MillWhell 的逻辑时间(logical time)使得开发基于时间的聚合应用更方便,从设计之初,MillWhell 就考虑了容错性以及扩展性。

Introduction

在 Google,流处理系统需要做到 fault tolerance,persistent state 以及 scalability。

MillWhell 是为了流处理,低延迟而量身定做的(tailored specifically to streaming, low-latency systems).

MillWhell 的 API 保证幂等性,从用户角度来看就是保证了恰好一次的语义。
Furthermore, the API that MillWheel provides for record processing handles each record in an idempotent fashion, making record delivery occur exactly once from the user’s perspective.

Motivation and Requirements

需要实时的分析搜索的高峰和波谷(spike and dip)
现在的做法,缓存 1 秒的 batch,然后使用模型预测与真实数据进行比较,如果相差很大,就有大概率出现了高峰/波谷。

存储:高峰/波谷 的持续时间不固定,因此需要保存一定的时间

Low Watermarks:需要区分「数据延迟」 和 「没有数据」,这样也能容忍数据的晚到(真实世界的数据是乱序的)

恰好一次:至少一次(at least once)可能导致波峰的出现

requirements of MillWhell:

  • 低延迟(Data should be available to consumers as soon as it is published i.e. there are no system-intrinsic barriers to ingesting inputs and providing output data).
  • 提供易用的状态保存抽象接口(Persistent state abstractions should be available to user code, and should be integrated into the system’s overall consistency model.)
  • 处理乱序(Out-of-order data should be handled gracefully by the system.)
  • 系统提供单调递增的 low watermark(A monotonically increasing low watermark of data timestamps should be computed by the system.)
  • 可扩展性好(Latency should stay constant as the system scales to more than machines.)
  • 恰好一次的语义(The system should provide exactly-once delivery of records)

System Overview

MillWheel 是一个基于用户定义逻辑以及输入输出数据的图(MillWheel is a graph of user-defined transformations on input data that produces output data.)
这些转换被称为计算,计算需要能够在框架层面做好负载均衡的分布式处理(用户不需要关系)

MillWheel 的输入输出组有 (key, value, timestamp) 三元组组成,其中 key 是系统内部有意义的元数据结构,value 表示真实数据的字节数组,可以是任何的值,timestamp 可以是用户设定的任何值(一般和墙上时间近似)MillWheel 会根据三元组进行 low watermark 的具体计算

用户的计算会形成一套具体的数据流图(a pipeline of user computations will form a data flow graph, as outputs from one computation become inputs for another, and so on.)

用户可以在不重启系统的情况下,动态的添加计算逻辑(Users can add and remove computations from a topology dynamically, without needing to restart the entire system.)

MillWhell 对每条数据提供幂等的处理(MillWheel makes record processing idempotent with regard to the framework API. As long as applications use the state and communication abstractions provided by the system, failures and retries are hidden from user code.)

在计算过程中,用户代码可以放到到每个 key 以及每个 computation 对应的存储内容

Core Concepts

MillWheel 提供了流计算的基本元素,同时提供了清晰的抽象(MillWheel surfaces the essential elements of a streaming system, while providing clean abstractions.)

Computations

用户逻辑定义在 computation 中,提供了用户代码的封装(Application logic lives in computations, which encapsulate arbitrary user code.)

MillWhell 会在接受到数据的时候触发用户逻辑,包括连接外部系统,操作其他的 MillWheel 计算以及进行输出(Computation code is invoked upon receipt of input data, at which point user-defined actions are triggered, including contacting external systems, manipulating other MillWheel primitives, or outputting data.)

需要用户保证外部系统的幂等性(If external systems are contacted, it is up to the user to ensure that the effects of their code on these systems is idempotent.)

Computation 应该针对单个 key 进行,而且不能对 key 在不同机器上的分布有预期(Computation code is written to operate in the context of a single key, and is agnostic to the distribution of keys among different machines.)

保证同一个 key 之间是顺序执行的,不同的 key 可以并行执行。

Keys

在 MillWheel 中 key 是用于聚合和比较的核心抽象(Keys are the primary abstraction for aggregation and comparison between different records in MillWheel)
每条记录会根据用户的规则分配一个具体的 key,computation 的逻辑运行在特定的 key 的空间内,同时也只能访问对应 key 的状态数据

Streams

Stream 是 MillWheel 系统中任意两个 computation 中间的转换机制(Streams are the delivery mechanism between different computations in MillWheel.)

每个 computation 会订阅 0 个或多个 stream,同时会输出到至少 1 个 stream,不同 computation 间的数据流转由系统进行保证(A computation subscribes to zero or more input streams and publishes one or more output streams, and the system guarantees delivery along theses channels)

Persistent State

在 MillWheel 系统中,状态是一个不透明的字节数组,由用户提供序列化和反序列化的逻辑。状态数据的存储系统有 BigTable/Spanner 提供。

Low Watermarks

low watermark 提供了未来数据最小时间戳的界限(The low watermark for a computation provides a bound on the timestamp of future records arriving at that computation)

定义: low watermark 的定义通过数据流水线递归进行给出(We provide a recursive definition of low watermarks based on a pipeline’s data flow),给定一个 computation,A,它最老的工作由正在做的工作中,timestamp 最小的工作给出(Given a computation, A, let the oldest work of A be a timestamp corresponding to the oldest unfinished (in-flight, stored, or pending-delivery) record in A.). 在这个定义下 A 的 low watermark 可以写成 min(oldest work of A, low watermark of C: C outputs to A),也就是所有上游的 low watermark 以及自己的最老(不一定是最早开始的)工作的时间

low watermark 由外部系统进行打点(Low watermark values are seeded by injector, which send data into MillWheel from external systems)

在真实世界中,对外部系统的延迟往往是一个估计值,因此需要能够容忍一小部分落后与 low watermark 的数据

用户可以假定自己获取的数据是全量的(这个假定对 low watermark 很重要)(By waiting for the low watermark of a computation to advance past a certain value, the user can determine that they have a complete picture of their data up to that time)

用户可以给每条记录赋予一个不小于输入源数据时间戳的任意值(When assigning timestamps to new or aggregate records, it is up to the user to pick a timestamp no smaller than any of the source records.)

Timers

Timers 是一段针对每个 key 的定时触发(基于墙上时间或者 low watermark)的回调逻辑(Timers are per-key programmatic hooks that trigger at a specific wall time or low watermark value.)

Timer 一旦设定,将由框架保证按时间戳递增的触发(Once set, timers are guaranteed to fire in increasing timestamp order)

Timer 会有记录日志以及相应状态保存,同样提供恰好一次的语义(They – Timers – are journaled in persistent state and can survive process restarts and machine failures. When a timer fires, it runs the specified user function and has the same exactly-once guarantee as input records)

API

1
2
3
4
5
6
7
8
9
10
class Computation {
//Hooks called by the system
void ProcessRecord(Record data);
void ProcessTimer(Timer timer);
//Accessors for other abstractions.
void SetTimer(String tag, int64 time);
void ProduceRecord(Record data, String stream);
StateType MutablePersistentState();
}

用户通过重载 ProcessRecordProcessTimer 来做具体的工作

Computation API

两个用户逻辑的入口是 ProcessRecordProcessTimer(The two main entry point into user code are provided by the ProcessRecord and ProcessTimer hooks)

系统接受到外部 RPC 之后(输入)会调用用户逻辑,用户逻辑可以访问状态数据,然后系统将输出传给下游(The MillWheel system invokes user-defined processing hooks in response to incoming RPCs. User code accesses state, timers, and productions through the framework API. The framework performs any actual RPCs and state modifications)

Injector and Low Watermark API

系统会为每个 Computation 中待处理的任务计算一个 low watermark(At the system layer, each computation calculates a low watermark value for all of its pending work(in-progress and queued deliveries).

所有的状态也都可以被赋予一个时间戳(Persistent state can also be assigned a timestamp value(e.g. the trailing edge of an aggregation window.) 这都由系统自动建立,从而为用户提供透明的 timer 语义 – 用户一般不需要和底层的 low watermark 进行交互,而是通过时间戳进行操作 low watermark(This is rolled up automatically by the system in order to provide API semantics for timers in a transparent way – users rarely interact with low watermarks in computation code, but rather manipulate them indirectly through timestam assignation to records.)

Injectors: Injectors 从外部系统读入数据到 MillWheel(Injectors bring external data into MillWheel)
Injector 负责最初的 low watermark 生成,因此可以间隙性的往下游发送传递的 low watermark 表示当前传递的情况(Since injectors seed low watermark values for the rest of the pipeline, they are able to publish an injector low watermark that propagates to any subscribers among their output streams, reflecting their potential deliveries along those stream.

一个 Injector 可能会由多个进程处理,因此 injector 的 low watermark 由所有这些进程共同决定(这些进程中最小的 low watermark)(An injector can be distributed across multiple processes, such that the aggregate low watermark of these processes is used as the injector low watermark.)

用户可以自定义一组预期的输出进程,用于度量系统的健壮性以及网络的延迟情况(The user can specify an expected set of injector processes, making this metric robust against process failures and network outages.)

Fault tolerance

Delivery Guarantees

MillWheel 编程模型的简洁性通过由框架提供幂等操作,而不需要用户自己实现幂等操作来完成。这可以大大减少用户的编程复负担。(Much of the conceptual simplicity of MillWheel’s programming model hinges upon its ability to take non-idempotent user code and run it as if it were idempotent. By removing this requirement from computation authors, we relieve them of a significant implementation burden.)

恰好一次(Exactly-Once Delivery)

每个 computation 接受到数据后的处理步骤如下:

  • 数据会和上次的去重数据进行比较,如果有重复则跳过(The record is checked against deduplication data from previous deliveries; duplicates are discarded.)
  • 执行用户代码,生成针对 timer,state,production 的变化(User code is run for the input record, possibly resulting in pending changes to timers state, and productions.)
  • 将上一步的变化保存到后端存储(pending changes are committed to the backing store)
  • ACK 上游(Senders are ACKed)
  • 发送给下游(Pending downstream productions are sent)

作为优化,可能某个 checkpoint 点会针对多条记录进行(As an optimization, the above operations may be coalesced into a single checkpoint for multiple records.)

MillWheel 中的数据在被 ACK 之前会一直重发,从而保证 At-Least-Once,这是 Exactly-Once 的前提(Deliveries in MillWheel are retried until they are ACKed in order to meet our at-least-once requirement, which is a prerequisite for exactly-once.)

这就引入了如下一个问题,如果下游在正常更新状态后,ACK 前退出了,就会接受到两条同样的数据,因此需要对数据进行去重(However, this introduces the case where a receiver may crash before it has a chance to ACK the input record, even if it has persisted the state corresponding to successful processing of that record. In this case, we must prevent duplicate processing when the sender retries its delivery.)

MillWheel 给每条产生的数据赋予一个全局唯一的 ID,利用原子操作的状态更新包含该 ID 来进行去重操作(The system assigns unique ID to all records at production time. We identify duplicate records by including this unique ID for the record in the same atomic write as the state modification.)

如果接受到已经成功处理的消息,系统比较后直接丢弃并 ACK(防止无休止的重试)(If the same record is later retried, we can compare it to the journaled ID, and discard and ACK the duplicate (lest it continue to retry indefinitely)

由于内存有限,不会将所有处理过的数据保存在内存,因此保存一个 Bloomfilter 保存我们见过的数据,如果 Bloomfiler 返回结果为空,则需要读取后端存储检查是否为重复数据。Record ID 会在确保所有发送方都已经重试完成后进行回收(Since we cannot necessarily store all duplication data in-memory, we maintain a Bloom filter of known record fingerprints, to provide a fast path for records that we must read the backing store to determine whether a record is a duplicate. Record IDs for past deliveries are garbage collected after MillWheel can guarantee that all internal senders have finished retrying.) 对于 Injector,系统会延迟回收 ID(For injectors that frequently deliver late data, we delay this garbage collection by a corresponding slack value (typically on the order of a few hours).

Strong productions

由于 MillWheel 处理的数据可能是乱序以及不确定性的,因此会在数据产生后,发往下游以及更新状态数据前进行 checkpoint(Since MillWheel handles inputs that are not necessarily ordered or deterministic, we checkpoint produced records before delivery in the same atomic write as state modification.)

这种在记录产生之前进行 checkpoint 的模式被称为 strong production(We call this pattern of checkpointing before record production strong productions.)

如果没有 checkpoint 的话,可能导致同一个逻辑窗口的数据不一致(Without a checkpoint, it would be possible for that computation to produce a window count downstream, but crash before saving its state. Once the computation came back up, it might receive another record (and add it to the count) before producing the same aggregate, creating a record that was bit-wise distinct from its predecessor but corresponded to the same logical window.)

MillWheel 使用 Bigtable 作为存储系统,Bigtable 对用户屏蔽了内部细节(相对于 读-修改-写 的操作),使得 checkpoint 就像 log 的行为一样。当一个进程启动的时候,checkpoint 会被重新读入内存并重放,当处理完成后 checkpoint 会被删除。(We use a storage system such as Bigtable, which efficiently implements blind writes (as opposed to read-modify-write operations), making checkpoints mimic the behavior of a log. When a process restarts, the checkpoints are scanned into memory and replayed. Checkpoint data is deleted once these productions are successful.)

6.1.3 Weak Productions and Idempotency

结合 Strong productions 以及 exactly-once 的消息传递性可以保证很多计算都是幂等的,包括系统层面的重试(Taken together, the combination of strong productions and exactly-once delivery makes many computations idempotent with regard to system-level retries.)

有些计算就算没有这些保证就已经是幂等的了(However, some computations may already be idempotent, regardless of the presence of these guarantees (which come with a resource and latency cost).)

因此 Strong productions 以及/或者 exactly-once 可以从用户层面取消(Depending on the semantic needs of an application, strong productions and/or exactly-once can be disabled by the user at their discretion.)

对于 weak productions, 我们不会在发送数据前进行 checkpoint,而是在保存状态前乐观的向下游广播数据,(For weak production, rather than checkpointing record productions before delivery, we broadcast downstream deliveries optimistically, prior to persisting state). 从经验得到,这会引入一个新的问题:整个流水线的完成时间会严重耦合,且依赖下游的 ACK(Empirically, this introduces a new problem, in that the completion times of consecutive stages of the pipeline are now strictly coupled as they wait for downstream ACKs of records.) 结合机器的故障率,端到端的延迟会随着流水线的深度增加而增加(Combined with the possibility of machine failure, this can greatly increase end-to-end latency for straggler productions as pipeline depth increases.) 例如,我们假设每台机器发生故障的概率为 1%,那么我们遇到故障的概率会随着流水线的深度递增而递增 – 对于深度为 5 的流水线,我们有 5% 的概率遇到至少一次机器故障(For example, if we assume (rather pessimistically) that there is a 1% chance that any machine will fail during a given minute, then the probability that we will be waiting on at least one failure increases disastrously with pipeline depth – for a pipeline of depth 5, a given production could have nearly a 5% chance of experiencing a failure every minute!)我们通过对部分延迟的数据流进行 checkpoint 来解决这一问题,允许这些接受者对他们的上游进行 ACK。通过这种选择性的 checkpoint 方式,我们既可以降低端到端的延迟,也可以减少所有资源的使用(We ameliorate this by checkpointing a small percentage of straggler pending productions, allowing those stages to ACK their senders. By selectively checkpoint in this way, we can both improve end-to-end latency and reduce overall resource consumption.)在 Figure 11 中,我们描述了这种 checkpoint 的具体实现。(In Figure 11, we show this checkpointing mechanism in action.) Computation A 的下游是 Computation B,而 Computation B 则马上将数据发送给下游 Computation C(Computation A produces to Computation B, which immediately produces to Computation C.) 然而 Computation C 的 ACK 则很慢,所以 Computation B 会在 1 秒的延迟之后进行 checkpointing。这样之后,Computation B 能够对上游 Computation A 进行 ACK,允许 Computation A 释放所占用的资源(However, Computation C is slow to ACK, so Computation B checkpoints the production after 1-second delay. Thus, Computation B can ACK the delivery from Computation A, allowing A to free any resources associated with the production.) 就算 Computation B 在后续阶段重启了,也能够从之前 checkpoint 的地方进行恢复,然后重新发送数据给下游 Computation C,而且没有数据丢失。(Even when Computation B subsequently restarts, it is able to recover the record from the checkpoint and retry delivery to Computation C, with no data loss.)

figure11_checkpoint.jpg

上面这种松散模式更适合具有幂等性的流水线,因为这样重试不会影响正确性,下游也是可以进行重试的。现实世界中一个幂等的例子就是无状态的过滤,任何时候输入相同的数据,会产生同样的输出。(The above relaxations would be appropriate in the case of a pipeline with idempotent computations, since retries would not affect correctness, and downstream production would also be retry-agnostic. A real-world example of an idempotent computation is a stateless filter, where repeated deliveries along input streams will not change the result.)

6.2 State Manipulation/状态操作

关于 MillWheel 的实现机制中如何操作状态,我们讨论了保存到后端存储的“hard” 状态,也讨论了保存在内存的“软”状态。我们需要满足如下几条对用户的保证:(In implementing mechanism to manipulate user state in MillWheel, we discuss both the “hard” state and that is persisted to our backing store and the “soft” state which includes any in-memory caches or aggregates. We must satisfy the following user-visible guarantess:)

  • 系统不会丢失数据(The system does not loss data)
  • 更新状态保证恰好一次的语义(Updates to state must obey exactly-once semantics)
  • 系统保存的数据在任何时候都应该是一致的(All persisted data throughout the system must be consistent at any given point in time)
  • Low watermarks 必须能够反应系统中排队的请求(Low watermarks must reflect all pending state in the system.)
  • 对于特定的 key,它相关的定时器必须按序触发(Timers must fire in-order for a given key.)

为了避免保存的状态数据不一致(比如触发器,用户状态,以及生产 checkpoints 之间),我们将每个 key 对应的状态数据更新封转为一个原子操作。(To avoid inconsistencies in persisted state (e.g. between timers, user state, and production checkpoints), we wrap all per-key updates in a single atomic operation.)这样可以适应系统任何时候的失败和中断(This results in resiliency against process failures and other unpredictable events that may interrupt the process at any given time.) 正如前面所说的,同一个操作中保证恰好一次的数据更新,并且对每个 key 的一致性状态数据提供保证(As mentioned previously, exactly-once data is updated in this same operation, adding it to the per-key consistency envelope)

由于计算会在机器之间迁移(负载均衡,故障或者其他原因),影响数据一致性的最大原因变为僵尸写入进程以及残留网络写入后端存储的概率(As work may shift between machines (due to load balancing, failures, or other reasons) a major threat to our data consistency is the possibility of zombie writers and network remnants issuing stale writes to our backing store.)。 为了解决这个问题,我们给每个 writer 附上一个序列化 token,后端存储会跟进这个 token 来验证写入是否合理。(To address this possibility, we attach a sequencer token to each write, which the mediator of the backing store checks for validity before allowing the write to commit.) 每个新工作进程首先将之前的工作进程设置为失效,这样就不会受之前进程的影响了。(New workers invalidate any extant sequencers before starting work, so that no remnant writes can succeed thereafter.) 这个序列号作为一个租约执行机制,就像 Centrifuge 系统一样。(The sequencer is functioning as a lease enforcement mechanism, in a similar manner to the Centrifuge system.) 因此,我们能够保证同一时刻点,对于特定的 key 只会有一个进程在进行写入。(Thus, we can guarantee that, for a given key, only a single worker can write to that key at a particular point in time.)

这种单写入进程的保证对软状态的维护同样重要,而且它不能通过事务进行处理(This single-writer guarantee is also critical to the maintenance of soft state, and it cannot be guaranteed by depending on transactions.) 以缓存写入缓慢为例:如果在构建缓存之后,来自另外一个进程的僵尸进程还在写入,则会导致缓存的不一致。(Take the case of a cache of pending timers: if a remnant write from another process could alter the persisted timer state after said cache was built, the cache would be inconsistent.)图 12 描述了这种情况,图中僵尸进程 B 由于外部因素,写入后端存储的事务没有按时到达后端。在事务实际开始前,B 的后继者,B-prime,执行定时器的扫描。在扫描完成之后,事务完成且 A 被 ACK,这样 B-prime 就拥有一个不完整的计时器状态。(This situation is illustrated by Figure 12, where a zombie process (B) issues a transaction that is delayed on the wire, in response to a production from A. Before the transaction begins, B’s successor, B-prime, performs its initial scan of pending timers. After this scan completes, the transaction is applied and A is ACKed, leaving B-prime with incomplete timer state.) 丢失的定时器可能被无限期的孤立,造成相应的输出延迟,这对于延迟敏感的系统是不能接受的(The lost timer could be orphaned indefinitely, delaying any of its output actions by an arbitrary amount of time, Clearly, this is unacceptable for a latency-sensitive system.)
figure12_transaction.jpg

此外,检测点也会遇到同样的情况,通过避免对后端存储的初始化扫描,系统的状态仍然是不可知的。(Furthermore, this same situation could occur with a checkpointed production, where it would remain unknown to the system by eluding an initial scan of the backing store.) 这种生产值直到产生 watermark 的时候才会被考虑到,而且这段时间内,我们可能向消费者报告一个错误的 watermark 值。另外,watermark 是单调递增的,所以我们无法对错误的 watermark 值进行纠正。违反 low watermark 的保证,则可能导致结果出错,其中包括提前触发定时器和产生不完整的窗口等。(This production would then note be accounted for in the low watermark until it was discovered, and in the intervening time, we might be reporting an erroneous low watermark value to consumers. Furthermore, since our low watermarks are monotonically increasing, we are unable to correct an erroneous advancement in the value. By violating our low watermark guarantees, a variety of correctness violations could occur, including premature timer firings and incomplete window productions.)

为了能够快速的从故障中进行恢复,MillWheel 中的每个算子可以在任何时刻进行任何粒度状态的 checkpoint(实践中,跟进数据量的不同,一般会有亚秒级别的,或者记录级别的 checkpoint)(In order to quickly recover from unplanned process failures, each computation worker in MillWheel can checkpoint its state at an arbitrarily fine granularity (in practice, sub-second or per-record granularity is standard, depending on input volume). 我们使用始终一致的软状态运行我们在特定的情况 – 机器宕机或者负载不均衡 – 进行最少的 checkpoint 数据扫描。在扫描 checkpoint 数据的时候,通常是异步的,这样运行在扫描 checkpoint 的时候同事也提供计算。(Our use of always-consistent soft state allows us to minimize the number of occasions when we must scan these checkpoints to specific cases – machine failures or load-balancing events. When we do perform scans, these can often be asynchronous, allowing the computation to continue processing input records while the scan progresses.)

7 System implementation

7.1 Architecture

MillWheel 部署在一组动态主机服务器上作为分布式系统提供服务。流水线中的每个计算会运行在一台或多态机器上,不同机器上的流通过 RPC 进行传递。在每台机器上,MillWheel 会管理输入以及进程级别的元数据,并且根据需要委托给相应的用户计算。(MillWheel deployments run as distributed systems on a dynamic set of host servers. Each computation in a pipeline runs on one or more machines, and streams are delivered via RPC. On each machine, the MillWheel system marshals incoming work and manages process-level metadata, delegating to the appropriate user computation as necessary.)

负载分配和均衡由一个复制的主机进行处理,它将每个计算分成一组有用不同密钥的单元(这些密钥会涵盖所有的可能性),然后将它们分发到一批机器上。(Load distribution and balancing is handled by a replicated master, which divides each computation into a set of owned lexicographic key intervals (collectively covering all key possibilities) and assigns these intervals to a set of machines.) 对于 CPU 压力过大或内存压力过大的情况(通过标准的进程监控得到),可以将这些单元进行移动,分割或者合并它们。每个单元被分配了一个唯一的序列号,当这个单元被移动,分割或者合并之后,这个序列号就失效了。这个序列号的重要性已经在 6.2 中进行了讨论。(In response to increased CPU load or memory pressure (reported by a standard perprocess monitor), it can move these intervals around, split them, or merge them. Each interval is assigned a unique sequencer, which is invalidated whenever the interval is moved, split, or merged. The importance of this sequencer was discussed in Section 6.2)

对于状态的存储,MillWheel 使用类似 Bigtable 或者 Spanner 类似的系统,提供原子性的,行级更新。同一个 key 的定时器,延迟的生产,以及保存的状态保存在同一行(For persistent state, MillWheel uses a database like Bigtable or Spanner, which provides atomic, single-row updates. Timers, pending productions, and persistent state for a given key are all stored in the same row in the data store.)

当一个 key 的一个最小单元由于机器故障导致被重新分配到其他机器上上,MillWheel 能投通过从后端存储中读取元数据快速的进行恢复。最初会在内存建立相应的堆式数据结构,用于保存堆积的定时器以及 checkpoint 过的输出,这些数据在整个 key 的生命周期中和后端存储中保持一致。为了保证这个特性,我们保证了单实例写入的语义 – 6.2 节中有详细描述。(MillWheel recovers from machine failures efficiently by scanning metadata from this backing store whenever a a key interval is assigned to a new owner. This initial scan populates in-memory structures like the heap of pending timers and the queue of checkpointed productions, which are then assumed to be consistent with the backing store for the lifetime of the interval assignment. To support this assumption, we enforce single-writer semantics(per-computation worker) that are detailed in Section 6.2)

7.2 Low Watermarks

为了保证数据的一致性,low watermark 必须被实现为一个子系统,而且能够全局能够访问且持续是正确的。我们将这实现为一个 central authority(类似 OOP),这个系统跟踪系统的所有 low watermark 且将他们记录到状态存储系统中,用以防止由于系统失败导致得到错误的 low watermark。(In order to ensure data consistency, low watermark must be implemented as a sub-system that is globally available and correct. We h have implemented this as a central authority (similar to OOP), which tracks all low watermark values in the system and journals them to persistent state, preventing the reporting of erroneous values in cases of process failure.)

当向 central authority 汇报数据的时候,每个进程会报自己工作的时间戳信息也带上。这包括已经 checkpoint 或者堆积的 production,堆积的定时器或者需要保存的状态信息。每个进程的效率依赖与我们内存中的保持一致性的数据结构,不需要去查询耗时更严重的后端存储。因为每个进程被分配给了每个 key 的一个最小单元,因此 low watermark 的更新也是以 key 的最小单元为单位而向 central authority 进行汇报。(When reporting to the central authority, each process aggregates timestamp information for all of its owned work. This includes any checkpointed or pending productions, as well as any pending timers or persisted state. Each process is able to do this efficiently by depending on the consistency of our in-memory data structures, eliminating the need to perform any expensive queries over the backing data store. Since processes are assigned work based on key intervals, low watermark updates are also bucketed into key intervals, and sent to the central authority.)

为了计算整个系统的 low watermark,这个 authority 必须能否访问所有堆积后以及处理过的工作的 low watermark 信息。当聚合每个进程的更新操作时,同时跟踪每个进程中的计算对应的区间所对应的 low watermark 值。如果某些 key 的最小单元丢失了,则丢失的单元的 low watermark 会被标记为上次得到的值,知道该单元的计算重新恢复为止。然后 authority 会在整个系统中对 low watermark 进行广播。(To accurately compute system low watermarks, this authority must have access to low watermark information for all pending and persisted work in the system. When aggregating per-process updates, it tracks the completeness of its information for each computation by building an interval map of low watermark values for the computation. If any interval is missing, then the low watermark corresponds to the last known value for the missing interval until it reports a new value. The authority then broadcasts low watermark values for all computations in the system.)

下游通过订阅它感兴趣的上游计算的 low watermark 值,从而计算它的整个输入的 low watermark 值。这个值由工作进程进行计算,而不是 authority 进行计算,与下面这个理由是一致的:central authority 的 low watermark 值应该不大于工作进程的 low watermark。通过有工作进行进行计算,central authority 的 low watermark 值就不会大于工作进程的 low watermark 值了,从而保证了这个属性。(Interested consumer computations subscribe to low watermark values for each of their sender computations, and thus compute the low watermark of their input as the minimum over these values. The reason that these minima are computed by the workers, rather than the central authority, is one of consistency: the central authority’s low watermark values should always be at least as conservative as those of the workers. Accordingly, by having workers compute the minima of their respective inputs, the authority’s low watermark never leads the workers’, and this property is preserved.)

为了能够在 central authority 保证一致性,我们给每个 low watermark 的更新附带上一个序列号。类似之前提到的单进程写入模式下更新本地 key 最小单元的状态,这个序列号保证只有最后的属主能够成功更新该 key 最小单元的 low watermark。为了扩展能力,authority 能够分布在多态机器上,每台 worker 机器上有一个或多个计算。经验上,这个能够在不损失性能的前提下扩展到 500000 个 key 最小单元。(To maintain consistency at the central authority, we attach sequencers to all low watermark updates. In a similar manner to our single-writer scheme for local updates to key interval state, these sequences ensure that only the latest owner of a given key interval can update its low watermark value. For scalability, the authority can be sharded across multiple machines, with one or more computations on each worker. Empirically, this can scale to 500000 key intervals with no loss in performance.)

在该系统中,我们还可以选择去除异常值,并为对速度特别要求的流水线提供启发式的 low watermark。比如,我们可以通过 99% 的记录时间戳得到一个 99% 的 low watermark。只对近似结果感兴趣的窗口消费者,可以利用这些 low watermark,从而避免等待那些晚到的数据。(Given a global summary of work in the system, we are able to optionally strip away outliers and offer heuristic low watermark values for pipelines that are more interested in speed than accuracy. For example, we can compute a 99% low watermark that corresponds to the progress of 99% of the record timestamps in the system. A windowing consumer that is only interested in approximate results could then use these low watermark values to operate with lower latency, having eliminated its need to wait on stragglers.)

总之,在我们的实现中,low watermark 对系统中的流没有时间顺序上的要求。low watermark 会反应正在进行的和已经保存的状态,通过建立一个全局的 low watermark 值来源,我们从逻辑上防止了不一致性,类似 low watermark 的回退。(In summary, our implementation of low watermarks does not require any sort of strict time ordering on streams in the system. Low watermarks reflect both in-flight and persisted state. By establishing a global source of truth for low watermark values, we prevent logical inconsistencies, like low watermarks moving backwards.)

8 Evaluation

为了说明 MillWheel 的性能,我们提供了针对流处理系统关键指标的试验结果

8.1 output latency

流处理系统中,延迟是对于性能的一个关键指标。MillWheel 系统支持低延迟的结果,并且随着系统的扩展而不增加延迟。我们使用了一个单 Stage 的消息传递,计算内容为排序的列子来进行说明 MillWheel 的性能。这类似于在连续计算中发生的多对多的 shuffle,是 MillWheel 中排序的会遇到的一个最差场景。Figure 13 展示了运行在 200 CPU 上的结果。消息传递的中位数延迟为 3.6 毫秒,95% 的延迟为 30 毫秒,这能够很好的满足 Google 内部对流系统的需求(95 线甚至在人类可能反应的时间内)(This resembles the many-to-many shuffle that occurs between successive computations that are keyed differently, and thus is a worst case of sorts for record delivery in MillWheel. Figure 13 shows the latency distribution for records when running over 200 CPUs. Median record delay is 3.6 milliseconds and 95th-percentile latency is 30 milliseconds, which easily fulfills the requirements for many streaming systems at Google (even 95th percentile is within human reaction time).
figure13_latency.jpg

这个测试是在关闭了 strong production 以及恰好一次的情况下进行的。当开启这两个特性的时候,延迟中位数变为 33.7 毫秒,95 线变为 93.8 毫秒。这个对比可以说明,计算本身的幂等性可以通过关闭这两个特性而大大降低延迟。(This test was performed with strong productions and exactly-once disabled. With both of these features enabled, median latency jumps up to 33.7 milliseconds and 95th-percentile latency to 93.8 milliseconds. This is a succinct demonstration of how idempotent computations can decrease their latency by disabling these two features.)

为了能够验证 MillWheel 的延迟不会随着系统的扩展而增加,我们将之前的单 Stage 实验跑在不同的 CPU 配置上,从 20 到 2000. Figure 14 展示了延迟的中位数基本保持不变。延迟的 99 线却会变糟(虽然都还在 100 毫秒以内)。然而,最差的延迟情况预计会随着规模的增加而增加 – 更多的资源意味着更多的出错可能。(To verify that MillWheel’s latency profile scales well with the system’s resource footprint, we ran the single-stage latency experiment with setups ranging in size from 20 CPUs to 2000 CPUs, scaling input proportionally. Figure 14 shows that median latency stays roughly constant, regardless of system size. 99th-percentile latency does get significantly worse (though still on the order of 100ms). However, tail latency is expected to degrade with scale – more machines mean that there are more opportunities for things to go wrong.)
figure14_latency_scalability.jpg

8.2 Watermark Lag

虽然某些计算(比如 Zeitgeist 中的波峰波谷检测)不需要定时器,但是其他许多计算(比如 dip detection)使用 low watermark 来触发定时器进行输出。对于这些计算,low watermark 的延迟将会导致结果的不新鲜。由于 low watermark 从计算图的源头开始进行产生,我们期望 low watermark 的延迟与流水线上计算与输入端的距离成正比。我们使用 200 CPU 运行了一个 3 级 MillWheel 流水线,并且每秒钟计算一次每个计算的 low watermark 值。图 15 中,我们可以看到,第一阶段的 low watermark 延迟了 1.8 秒,但是,对于后续的阶段,每阶段的延迟增加了不到 200 毫秒。怎么减少 watermark 的延迟现在还是一个正在发展中的领域。(While some computations (like spike detection in Zeitgeist) do not need times, many computations (like dip detection) use timers to wait for the low watermark to advance before outputting aggregates. For these computations, the low watermark’s lag behind real time bounds the freshness of these aggregates. Since the low watermark propagates from injectors through the computation graph, we expect the lag of a computation’s low watermark to be proportional to its maximum pipeline distance from an injector. We ran a simple three-stage MillWheel pipeline on 200 CPUs, and polled each computation’s low watermark value once per second. In Figure 15, we can see that the first stage’s watermark lagged real time by 1.8 seconds, however, for subsequent stages, the lag increased per stage by less than 200ms. Reducing watermark lag is an active area of development.
figure15_lowwatermark.jpg

Framework-Level Caching

由于 checkpoint 的频率很好,MillWheel 会产生非常多和存储层交互的流量。当使用 Bigtable 类似的系统时,读取的成本要高于写入,而 MillWheel 通过框架层面的缓存来解决这一问题。MillWheel 的一个常见用例是将数据缓存到存储引擎中,直到 low watermark 通过了窗口边界然后再获取这些数据进行聚合。这种使用模式对于存储系统中常见的 LRU 缓存是非常不利的,刚修改过的数据很可能并不是马上就需要读取的数据。MillWheel 知道它的数据怎么被使用,并且能够提供更好的缓存策略。在图 16 中,我们给出了工作进程和存储进程使用的 CPU 数与缓存大小的关系(出于商业机密考虑,CPU 使用率进行了标准化),增加缓存大小可以线性的增加 CPU 使用率(超过 550MB 的缓存,由于大部分数据已经被缓存,所有基本没有更大的作用)。在这个实验中,MillWheel 的缓存可以将 CPU 的使用率降低为原来的二分之一。
figure16_cache.jpg

8.4 Real-world Deployments

MillWheel 支持了 Google 内部的各种系统。为各种广告客户执行流连接,其中许多需要给客户展示低延迟的数据大盘。计费系统依赖于 MillWheel 的恰好一次保证。除了 Zeitgeist 之外,MillWheel 还提供一个广泛的异常检测服务,该服务被很多不同的团队看作开箱即用的服务使用。其他部署还包括了网络交换机以及集群运行状态监控。MillWheel 还支持了面向用户的工具,比如 Google 街景中的全景图像生成和处理。(MillWheel powers a diverse set of internal Google systems. It performs streaming joins for a variety of Ads customers, many of whom require low latency updates to customer-visible dashboards. Billing pipelines depend on MillWheel’s exactly-once guarantees. Beyond Zeitgeist, MillWheel powers a generalized anomaly-detection service that is used as a turnkey solution by many different teams. Other deployments include network switch and cluster health monitoring. MillWheel also powers user-facing tools like image panorama generation and image processing for Google Street View.)

同样还有 MillWheel 不适用的常见。单个操作太大导致在计算过程中对 checkpoint 不友好的情况不适合,因为系统的稳定依赖于动态负载均衡。如果负载均衡器遇到这样的算子,要么只能强制结束,然后调度到其他机器,或者等待它完成。前者浪费资源,后者则使机器过载。作为一个分布式系统,MillWheel 对于不同 key 之间不能够并行的场景也处理不好。如果整个流水线上 90% 的流量都和某个特定的 key 绑定在一起,那么某台机器必须能够处理该流水线的 90% 流量,这显然是不可取的。建议作业的开发人员避免会造成单台机器会处理很高流量的 key(比如使用用户使用的语言或者用于代理等),或者通过构建两级聚合来处理相应的事情。(There are problems that MillWheel is poorly suited for. Monolithic operations that are inherently resistant to checkpointing are poor candidates for inclusion in computation code, since the system’s stability depends on dynamic load balancing. If the load balancer encounters a host spot that coincides with such an operation, it must choose to either interrupt the operation, forcing it to restart, or wait until finishes. The former wastes resources, and the latter risks overloading a machines. As a distributed system, MillWheel does not perform well on problems that are not easily parallelized between different keys. If 90% of a pipeline’s traffic is assigned a single key, then one machine must handle 90% of the overall system load for that stream, which is clearly inadvisable. Computation authors are advised to avoid keys that are high-traffic enough to bottleneck on a single machine (such as a customer’s language or user-agent string), or build a two-phase aggregator.)

如果计算基于 low watermark 进行聚合,数据的延迟导致 low watermark 一直不能正常更新,MillWheel 的性能就会下降。这可能会导致系统中缓冲的数据产生数小时的偏差。通常情况下,内存使用量和数据倾斜成正比,因为作业依赖 low watermark 来熟悉缓冲区的数据。为了防止内存使用量不受限制的增长,有效的补救措施是通过等待注入较新的记录,直到 low watermark 继续前进,来限制系统中的总偏差。(If a computation is performing an aggregation based on low watermark timers, MillWheel’s performance degrades if data delays hold back low watermarks for large amounts of time. This can result in hours of skew over buffered records in the system. Oftentimes memory usage is proportional to skew, because an application depends on low watermark to flush this buffered data. To prevent memory usage from growing without bound, an effective remedy is to limit total skew in the system, by waiting to inject newer records until the low watermarks have advanced.)

Comments