Flink 中 State 用于保存 Task 的状态,Checkpoint 的时候,会将 State 保存到外存中。

State 有两种,Keyed State 和 Operator State,每一种则可以有两种形式存在:Managed 和 Raw。其中 Keyed State 只能引用在 Keyed Stream 上,在 Flink 中使用 keyBy() 创建一个 keyed Stream. Flink 保证同一个 key 的 Tuple 会被发送到同一个 task 进行处理,Operator State 使用 ListState(用于 rescale),上层是一个 HashMap Key 是 name,value 是一个封装了 list 的 class。

State 的入口在 AbstractStateBackend,AbstractStateBackend 的子类有

  • MemoryStateBackend
  • RocksDBStateBackend
  • FsStateBackend

本文主要分析 MemoryStateBackend 和 RocksDBStateBackend。如无特殊说明,本文所有代码均使用附[1]

state-hierarchy.png

MemoryStateBackend

MemoryStateBackend 使用内存作为存储主要包括一些配置项(内存大小 等)以及创建 operator/keyed state 的接口,本文会创建一个 HeapKeyedStateBackend

HeapKeyedStateBackend 会提供创建各种 State 的接口(ValueState,ListState,MapState,SortedMapState,ReducingState,FoldingState,AggregatingState,RawQueueState,RawSecodaryState,RawStortedSecondaryState 等),返回当前 state 的 snapshot(checkpoint 会用到) 以及从指定的 KeyedStateHandles 恢复的接口。

其中 createValueState 则创建一个 HeapValueState

一些重要的变量和方法如下所示:

  • 变量
    • entries – 记录具体的值,类型为 Map>> 每个 value 由 唯一确定
    • defaultValue – 表示 StateValue 的初始值,没有则为 null
    • backend – 当前 state 所属的 stateBackend
    • stateDescriptor – 当前 state 的描述信息,主要包括当前 state 的 namespace、value 已经面向用户的 descriptors 等
  • 方法
    • createNewMap() – 创建一个 map,跟进是否为 Queryable 的 state,选择是否创建 ConcurrentHashMap
    • snapshot(int, keyGroup, DataOutputView outputView) – 将指定 keyGroup 的 key 进行一次 snapshot,写入到指定的 outputView 中
    • restore(int keyGroup, DataInputView intputView) – 从 inputView 中恢复
    • get(int keyGroup, K key, N namespace) – 获取特定的 value
    • clear(int keyGroup, K key, N namespace) – 清空特定的 value
    • clear(Set> stateKeys) – 清空特定的 value
    • value() – 返回特定的 value
    • update(V value) – 更新特定的 value
  • update(int keyGroup, K key, N namespace, V value)

程序流程解释

程序和 State 相关的流程如下:

  1. 50 行创建一个 MemoryState,且设置 DefaultValue 为 (0, 0)
  2. 每次处理一个 Tuple 的时候,会首先读取当前 key 对应的 value(第 25 行)
  3. 然后进行处理后,更新 state 值(28 - 34 行)
  4. 最后跟进 state 的值判断是否进行相应处理 – 往下有发送平均值,以及清空 state (37 - 40 行)

state 的值变化如下所示(红色的 0 为调用 clear 后生成)

f0 f1
0 0
1 3
2 8
0 0
1 7
2 11
0 0
1 2

RocksDBStateBackend

RocksDBStateBackend 和 MemoryStateBackend 的区别主要在于,使用 RocksDB 替代 Memory 来存储 State。

由于 RocksDB 是一个 Key-Value Store,因此存储数据结构,和 MemoryStateBackend 稍微不一样。其中 key 为 serialized(keyGroup, key, namespace), value 为 serialized(value)

自问自答

  • 有哪些 StateBackend 实现,区别都是什么,每一种的优劣是什么
    • StateBackend 的实现在文首给出来,其中 MemoryStateBackend 会很快,但是不仅行持久化,RocksDBStateBackend 使用 RocksDB 进行 State 存储,速度快,且会存储到持久化介质上
    • MemoryStateBackend 的局限:
      • 每个 state 的大小有上限限制,默认 5M,可配置
      • state 大小不能超过 akka frame size(其他的 statebackend 是否可以呢?)
      • JobManager 的内存需要能够存放所有的 state
      • 建议使用场景:开发和调试阶段;state 不大的场景
    • RocksDBStateBackend
      • RocksDBStateBackend 将 in-flight 数据存在 RocksDB 中,会存放在 TaskManager 的 data 目录下
      • 经常使用异步 Snapshot
      • 局限如下:
        • 每个 key value 的大小不能超过 2^31(因为RocksDB 的 JNI bridge API 使用 byte[] 格式)
      • 建议使用场景:
        • state 量很大,window 窗口很长的 job
        • 其他需要 high-available 的场景
      • 支持增量 checkpoint
    • FsStateBackend
      • 将 in-flight 数据存储在 TaskManager 的内存中,checkpoint 的时候将数据存储到外存
      • 默认使用异步 snapshot
      • 建议使用场景:
        • 大 state,window 窗口很长的 job
        • 其他需要 high-available 的场景
  • 作业选择不同 StateBackend 的标准是什么
    • 跟进速度和是否需要持久化到外存选择?
  • 不同 StateBackend 保存的数据结构是什么样的
    • MemoryStateBackend 包括多种 State,其中 HeapValueState 的数据结构为 Map>>
    • RocksDBStateBackend 则存储的是 serialized(keyGroup, key, namespace) <-> serialized(value) 的形式
  • 不同 StateBackend 中内存占用怎么估计/计算
    • 暂时还不知道
  • 如果需要迁移 State 数据,怎么完成的(作业的并发进行调整)
  • 哪些节点/角色会有 StateBackend,都用来做什么
    • 所有需要保存 State 的节点(?)JobManager、Task 等
  • Keyed Stream 中不同的 key 会被发送到同一个 task 吗?key 往下发送的逻辑是什么
    • 根据 hash(key) 往下游发送
  • keyGroup 和什么有关,生成规则是什么,在查问题的时候会用到吗(某个 value 属于哪个 keyGroup)
    • keyGroup 的存在主要用于 rescale task,能够避免 random io 等
    • 现在 keyGroup 在 Job 启动的时候确定,后面不会进行变更
    • key 属于哪个 keyGroup 由 hash 函数确定

参考文献

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/state_backends.html

[1] 本文使用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class StreamingState {
public static void main(String[] args) throws Exception {
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
env.execute("Streaming WordCount");
}
}
class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Overridelus
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}

[2] keyGroup 的好处

state-rescale.png

Comments