Apache Beam中的有状态计算

时间:2021-12-15 14:03:03

      Beam帮助我们处理流式、乱序、大规模的数据,并且提供了高度的抽象机制Pipeline,统一了流式和批量数据处理。

      从功能上流处理可以分为无状态(stateless)的和有状态(stateful)两种。在流处理的框架里引入状态管理大大提升了系统的表达能力,让用户能够很方便地实现复杂的处理逻辑,是流处理在功能上的一个飞跃。以下提到State或者状态是相同的概念。

      有状态计算是Apache Beam一个新特性,它能够让我们更加灵活的应对应用场景。在这篇文章中,我们将介绍如何在Beam中进行有状态的计算:

  1. 它的原理是什么

  2. 它如何与Beam的其他特性结合使用

  3. 我们可以使用它做什么

  4. 代码示例

      在开始之前,我们首先回顾一下,在Beam中,大数据的并行处理表达为一个有向无环图,接收从PCollections传入的数据,由PTransforms进行处理。在下文的描述中将以此为基础进行展开。


Apache Beam中的有状态计算

      如上图所示,矩形框是PTransforms,线表示PCollection(Beam中表示数据的容器),PTransform接受PCollection作为输入,处理之后,输出到下一个PTransform。PCollection可以是有限的数据集(批处理)可以是无限的数据集(流式)。圆柱表示外部的数据输入和数据输出,例如不在变化的日志文件(有限数据集),再比如从Kafka中获取的数据流(无线数据集)。

      在Beam的数据处理中有两个重要的概念:ParDo和GroupBy。ParDo复杂在并行计算中对每一个元素进行处理,GroupByKey(非常类似于CombinePerKey)负责按照key进行数据的聚合。如下图中所示,颜色表示元素的Key,相同的颜色表示具有相同的Key。

      GroupByKey/CombinePerKey收集所有的绿色方块,进行聚合计算(求和、计算均值等)之后输出1个数据元素。


Apache Beam中的有状态计算

      上图是最简单的用户场景,由ParDo/Map和GroupByKey/CombinePerKey进行无状态的计算。现实的应用场景比这要复杂,无状态的计算在很多情况下是不够的。本文的主题就是介绍Beam的新特性:对每个元素进行有状态的计算


Apache Beam中的有状态计算

      在上图中,Beam的ParDo增加了一个State机制(右侧的圆柱体,同样颜色表示元素的Key,不同的颜色表示不同的Key),State机制提供了API允许在处理数据的时候高效的读取和写入数据。State根据key进行分区(Partition),同样还支持窗口机制,稍后我们会说明为什么State要进行这样分区。

      接下来的文章中,我们将详细的说明State新特性的细节:

  1. 从high-level视角上,State的工作机制
  2. 与现有的特性有什么不同
  3. 在引入State新特性的情况下,是如何保证海量数据处理的扩展性的
  4. 介绍完上述细节之后,通过简单的例子,看看如何应用State特性。

有状态的计算在Beam中是如何工作的

      在ParDo中具体的代码逻辑体现在DoFn中,DoFn负责处理每一个输入元素。没有State,DoFn就是一个纯粹的函数,接收一个输入,输出给一个或者多下下游,对应与MapReduce中Mapper。增加了有状态计算State之后,DoFn就能够暂存中间计算结果、原始数据等。


Apache Beam中的有状态计算

      首先注意图中的方块、三角、原型都是相同的颜色,这表示示例中的有状态计算发生在同一个Key上。黄色的@ProcessElement表示Beam的底层大数据引擎(Spark、Flink等)会对DoFn的调用入口。紫色的ProcessContext.output表示DoFn对大数据引擎的调用入口。

  • • 每来一条数据,对于每一个key和窗口组合,调用DoFn的@ProcessElement
  • • DoFn读写State,由图中的紫色曲线表示。
  • • DoFn
    通过ProcessContext.output(或者ProcessContext.sideOutput).向下游(output)或者旁路(side
    output)发送输出结果。

      从上层视角来看,State的原理挺直观。在我们日常的编程工作中,有一个例子与State特别类似,在循环中,我们不断的读写一个变量来统计已经处理了多少条数据了。

State有状态计算是如何与Beam模型统一的

      在说明State有状态计算是如何与Beam模型统一的之前,首先设想一下另一种处理状态的情形:CombineFn。在Beam中,可以实现自己的Combine.perKey(CombineFn)。

      下边的图描绘了CombineFn的原理,底层的引擎会调用CombineFn对每一个key创建一个累加器accumulator ,最终从累加器中获取结果,并发送给下游:


Apache Beam中的有状态计算

      跟上边描述有状态DoFn的时候一样,方块、三角、圆形都是红色的,也就是说对单个Key进行Combine。图中的方法addInput和extractOutput都是黄色的表示,都是被底层的引擎触发调用:引擎调用addInput函数,更新当前的累加器accumulator 。

  • • 执行引擎负责管理累加器
  • • 当引擎判断需要将结果发送出去的时候,就会调用extractOutput方法。

      到目前为止CombineFn和有状态的DoFn非常相似,在实际应用的时候有几个非常重要的区别:

  • • 引擎控制所有的调用和状态的持久化,开发人员无法决定什么时候用何种方式持久化中间数据,也无法决定什么时候累加器accumulator该销毁(基于触发器),什么时候该将结果从累加器中读取出来,发送给下游。
  • • 在CombineFn中只能有一个中间数据的存储的地方—累加器。在有状态的DoFn中,只能读取需要的数据和写入改变的数据。
  • • 不需要扩展DoFn就能获得一些有用的特性,例如一个输出多个输出(multi output)或者旁路输出(side output)。(这些特性可以用复杂的CombineFn组合来模拟,但是这样去实现的话,显得非常别扭,而且效率会非常低。而且有一些特性例如Side input、窗口机制都是CombineFn所不具备的。)

      底层引擎在执行的时候主要是调用CombineFn中的mergeAccumulators方法,这可以带来巨大的优化:底层引擎可以在对多个输入调用多个CombineFn实例,最后再合并成一个结果

      如下图所示:


Apache Beam中的有状态计算

      对于CombineFn,无论聚合操作多复杂,相同的输入,数据的输出必须是严格相同的(函数式编程的特性)。

      因为上图中的合并操作并不是由有状态的DoFn函数来处理的,所以底层的引擎无法在执行时将数据进行分流处理,也没法重新聚合数据。注意,此时DoFn需要应对数据乱序的情况,但是DoFn的输出可以是不严格相同的。

      至此,我们DoFn和CombineFn的不同之处说完了。Beam中遵循的是函数式编程的不可变性理念,有状态的计算跟不可变性的理念稍微有些冲突,但是在实际的场景中,完全的函数式变成会让我们在应对某些应用场景的时候,代码实现会特别繁琐,所以有状态的计算引入了可变性。

示例: 为乱序的数据赋予连续的Index编号

      假设我们要处理数据流,数据流按照Key+Window的方式进行分组,在每一个分组内,按照数据的处理顺序,为每一条数据分配一个编号,要求编号连续,并且不中断。在进入代码之前,先从概念模型上说明一下,我们希望设计一个转换逻辑(PTransform),如下图所示:


Apache Beam中的有状态计算

      元素A,B,C,D,E的乱序的,因此它们分配的编号也是乱序的,在这个场景里保证编号是唯一的、连续的,并且每个元素不会漏掉就够了。
      从概念上将,这很类似于一个for循环,唯一需要记录的是下一个编号是多少。

  • • 当元素进入的时候,为元素附上一个编号
  • • 然后将编号+1作为下一个元素的编号

      说到这里,我们来谈论大数据和并行性,上图中的算法根本不可并行化! 如果你想在整个PCollection上应用这个逻辑,你一次只能处理PCollection的1个元素,这显然不行。 绝大多数情况下,有状态的计算中编写的ParDo代码应该能够让底层的执行引擎并行执行。

      Beam中的状态单元(state cell)被限定为key+window对。 当DoFn以“index”的名称读取或写入状态时,它实际上正在访问由“index”指定的可修改的单元(mutable cell,Beam使用了函数是编程的概念,变量被视为是不可修改的,一般表述为immutable不可变,mutable imutable对应于java中的变量和常量)以及当前处理的key和window。 所以,当考虑状态单元格时,可以将转换的完整状态看成一张表,根据在程序中使用的名称命名行,如“index”,列是key + window对,像这样:


Apache Beam中的有状态计算

      表中有很多的列,可以增加分布式计算时的并行度。有两种特殊的情况:

  • • 窗口(Window)很少,每个窗口内包含很多的key,例如全局只有一个窗口,用userId作为key进行计算。
  • • Key很少,每个key包含很多的窗口(window),例如使用全局的key,对key中的每个固定长度窗口进行计算。

    注意:所有支持Beam的引擎现在只支持基于key并行计算,对Window进行并行计算。
    

      大部分情况下,需要关注一个列(key窗口对)就够了,跨越过个列的计算在设计上是不允许存在的。

Beam’s Java SDK中State

      现在已经在Beam模型中讨论了有状态计算,并通过一个抽象的例子进行了说明,接下来看一下使用Beam的Java SDK编写状态处理代码。 以下是一个状态DoFn的代码,它可以在每个按键窗口的基础上为每个元素分配一个唯一、连续的索引:

new DoFn<KV<MyKey, MyValue>, KV<Integer, KV<MyKey, MyValue>>>() {

  // A state cell holding a single Integer per key+window
  @StateId("index")
  private final StateSpec<Object, ValueState<Integer>> indexSpec = 
      StateSpecs.value(VarIntCoder.of());

  @ProcessElement
  public void processElement(
      ProcessContext context,
      @StateId("index") ValueState<Integer> index) {
    int current = firstNonNull(index.read(), 0);
    context.output(KV.of(current, context.element()));
    index.write(current+1);
  }
}

      代码分析:

  • 首先要看的是代码中有一些@StateId(“index”)注解。 在DoFn中使用名为“index”State。 Beam Java
    SDK,以及此层的执行引擎,会利用这些注解对DoFn进行组装,以便能够运行在引擎上。

  • 第一个@StateId(“index”)在StateSpec类型的字段上注解(对于“state specification”),声明并配置状态一个State Cell单元格。
    类型参数ValueState描述了您可以从该单元格中值的类型 - ValueState只存储1个值。 注意,规范本身不是可用的State
    cell状态单元格,只是一个类型的声明 ,在运行的时候,由底层的执行引擎负责,根据State Spec提供一个运行时的State
    Cell。

  • ValueState需要指定对其中存储的值对应的Coder(有时候可以不必指定)来序列化将要存储的值。
    调用StateSpecs.value(VarIntCoder.of())即可。
  • 第二个@StateId(“index”)注解在@ProcessElement方法的参数上。 表示使用先前声明的ValueState。
  • 以最简单的方式访问状态:read()读取,使用write(newvalue)来写入新的值。
  • DoFn的其他特性仍然像原来一样使用即可 - 例如context.output(…),
    还可以使用sideInput,sideOutput,窗口等。

      关于SDK和执行引擎与DoFn之间的几个注意事项:

  • 如果声明了一个状态单元格,然后使用错误的类型,则Beam Java SDK会提示错误。

  • 如果声明了具有相同IDState,则Beam SDK也将提示错误。

  • 执行引擎可以判断是否是一个有状态的DoFn,如果是,在执行的时候与无状态的DoFn相比,执行逻辑的差异会很大,例如通过附加的Shuffing和同步来避免对State的并发访问,以提升性能。

示例:异常侦测

      假设我们用一个复杂的模型,接收用户的一系列动作,并用定量的方式去判断用户行为,例如检测欺诈活动。 我们需要用事件建模,并将输入的事件与最新的模型进行比较,以确定某些内容是否发生变化。

      如果尝试将CombineFn的来表达模型,使用mergeAccumulator可能会遇到一系列的问题。 假设使用CombineFn,像下边这样:

class ModelFromEventsFn extends CombineFn<Event, Model, Model> {
    @Override
    public abstract Model createAccumulator() {
      return Model.empty();
    }

    @Override
    public abstract Model addInput(Model accumulator, Event input) {
      return accumulator.update(input); //使用变量来提高性能 }

    @Override
    public abstract Model mergeAccumulators(Iterable<Model> accumulators) {
      //代码
    }

    @Override
    public abstract Model extractOutput(Model accumulator) {
      return accumulator; }
}

      现在,可以使用Combine.perKey(new ModelFromEventsFn())来计算窗口(Window)中的某个特定用户的模型。 那么如何将此模型应用于计算相同事件的流程? 标准的做法是,在ParDo中处理PCollection的元素时,将Combine为ParDo的sideInput读入。 所以可以通过sideInput读取模型并对事件流进行处理,输出预测结果,如下所示:

PCollection<KV<UserId, Event>> events = ...

final PCollectionView<Map<UserId, Model>> userModels = events
    .apply(Combine.perKey(new ModelFromEventsFn()))
    .apply(View.asMap());

PCollection<KV<UserId, Prediction>> predictions = events
    .apply(ParDo.of(new DoFn<KV<UserId, Event>>() {

      @ProcessElement
      public void processElement(ProcessContext ctx) {
        UserId userId = ctx.element().getKey();
        Event event = ctx.element().getValue();

        Model model = ctx.sideinput(userModels).get(userId);

        // 如果需要可以在发出之前添加自己的代码逻辑
        … c.output(KV.of(userId, model.prediction(event))) … 
      }
    }));

      在这个Pipeline中,每个用户每个窗口由Combine.perKey(…)发出的1个模型,然后通过View.asMap()transform准备sideInput。 ParDo对事件的处理将阻塞,直到sideInput准备就绪,缓冲事件,然后将根据模型判断每个事件。 这是一个高延迟,high compelete的解决方案:该模型使用窗口中的所有用户行为进行计算,所以窗口超时之前不能输出任何输出。

      假设想更早的获得预测结果,极端的情况下甚至没有窗口,而只是想使用目前已有的数据持续建模并预测,即使模型可能不完整。 如何控制更新正在使用的模型? 触发器Trigger是通用的Beam特性,用于在数据完整性与延迟取得折中。 以下的代码是在上边代码的基础上增加了一个触发器Trigger,每1秒钟计算一个新的模型出来:

PCollection<KV<UserId, Event>> events = ...

PCollectionView<Map<UserId, Model>> userModels = events

    // 在延迟和成本之间的权衡
    .apply(Window.triggering(
        AfterProcessingTime.pastFirstElementInPane(Duration.standardSeconds(1)))

    .apply(Combine.perKey(new ModelFromEventsFn()))
    .apply(View.asMap());

      这种做法通常是延迟和计算成本之间一个不错的权衡,如果1秒钟之内发生了大量的时间,那么每1秒钟才会更新一次模型,不会因为频繁的更新模型而导致性能急剧下降。在实际中,因为sideInput的缓存和处理的延迟,虽然模型是按照1秒1次进行更新的,但是真正使用的模型可能是很多秒之前的(sideInput的缓存和处理的延迟时间+1秒)。所以很多时间,甚至有可能所有的事件都使用的都不是最新的模型进行处理的。如果底层的引擎使用了足够小的缓存过期策略,选择激进的触发器Trigger策略,可能会降低延迟,但是会带来额外的计算成本。

      除此之外,还有另一种成本需要考虑:在ParDo中向下游发送了大量事件,其中很多事件的对下游来说无用,如果只能在上游的output中定义哪些需要发送给下游,那么就无法使用Filter PTransform来过滤数据,减少向下游发送的数据量。
有状态的计算可以解决sideInput的延迟和向下游发送大量无用事件的问题。代码如下:

new DoFn<KV<UserId, Event>, KV<UserId, Prediction>>() {

  @StateId("model")
  private final StateSpec<Object, ValueState<Model>> modelSpec =
      StateSpecs.value(Model.coder());

  @StateId("previousPrediction")
  private final StateSpec<Object, ValueState<Prediction>> previousPredictionSpec =
      StateSpecs.value(Prediction.coder());

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("previousPrediction") ValueState<Prediction> previousPredictionState,
      @StateId("model") ValueState<Model> modelState) {
    UserId userId = c.element().getKey();
    Event event = c.element().getValue()

    Model model = modelState.read();
    Prediction previousPrediction = previousPredictionState.read();
    Prediction newPrediction = model.prediction(event);
    model.add(event);
    modelState.write(model);
    if (previousPrediction == null 
        || shouldOutputNewPrediction(previousPrediction, newPrediction)) {
      c.output(KV.of(userId, newPrediction));
      previousPredictionState.write(newPrediction);
    }
  }
};

      代码分析:

  • • 在代码中声明了两个State, @StateId(“model”) 来保存用户的当前模型,
    @StateId(“previousPrediction”) 来保存上一个预测结果。
  • • 跟之前一样在 @ProcessElement 里添加两个State作为参数。
  • • 通过
    modelState.read()来读取当前的模型,因为State是按照key+window进行分区的,所以读取的model对应于当前处理的事件的UserId。
  • • 通过 model.prediction(event)
    计算出预测结果,与上个预测结果做比对,上个预测结果使用previousPredicationState.read()获取。
  • • 然后调用 model.update() 并写入到State种modelState.write(…).
  • • 如果跟上次的预测结相比发生和很大的变化,调用 context.output(…)
    发送给下游,并且使用previousPredictionState.write(…)将最新的预测结果保存起来。在此处的判断是根据上一个预测结果,而不是上一个计算结果,在实际应用中,判断逻辑要比代码示例中的要复杂。

性能考虑

      在决定是否要使用基于key和窗口的State之前,需要了解它是如何运行的。对于每一个底层的引擎来说,实现不尽相同,但是有一些通用的注意的点:

  • • 按照key和窗口分区

      这种方式下,引擎需要进行shuffle操作,将具有相同key和窗口的数据汇聚到一起。如果能在前边的数据处理中已经shuffle,底层的引擎会利用这一点提升性能。(此处可以参考Spark中所谓的窄依赖和宽依赖的说明,窄依赖可以形成流水线,减少shuffle的次数,对性能的提升有很大的帮助)。

  • • 同步的成本

      虽然底层的引擎是分布式并行执行的,但是对于一个同一个key+窗口来说,state操作只能是同步的,所以可能会导致性能下降。

  • • State的存储和容错问题

      State是基于key+窗口的,key越多窗口越多,读写对存储State(内存、数据库等等)的并发访问越高,对容错性和一致性的要求也越高,这意味着State的存储必须提供足够高的性能。

  • • State过期

      State是基于key+窗口的的,底层的引擎必须要跟踪每一个窗口,才能够在窗口过期(例如当Watermark超过了最大的允许的延迟)的时候,回收state所占用的资源,跟踪每一个窗口和回收state所占用的资源也会带来性能的开销。