Apache beam其他学习记录

时间:2022-12-22 19:34:45

Combine与GroupByKey

GroupByKey是把相关key的元素聚合到一起,通常是形成一个Iterable的value,如:

cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
Combine是对聚合后的Iterable进行处理(如求和,求均值),返回一个结果。内置的Combine.perKey()方法其实是GroupByKey和Combine的结合,先聚合和处理。

Beam中还有许多内置的处理类,比如Sum.integersPerKey(),Count.perElement()等

在全局窗口下,对于空输入,Combine操作后一般会返回默认值(比如Sum的默认返回值为0),如果设置了.withoutDefault(),则返回空的PCollection。

在非全局窗口下,用户必须指明空输入时的返回类型,如果Combine的输出结果要作为下一级处理的输入,一般设置为.asSingletonView(),表示返回默认值,这样即使空窗口也有默认值返回,保证了窗口的数量不变;如果设置了.withoutDefault(),则空的窗口返回空PCollection,一般作为最后的输出结果。


Platten与Patition

用于PCollection与PCollectionList的转换。

官方文档给的Platten代码很容易理解:

// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
将一个PCollectionList={ PCollection{String1}, PCollection{String2}, PCollection{String3} }转换为一个PCollection={String1, String2, String3}.

而Patition刚好反过来,要将PCollection转换为PCollectionList需要指明分成的list长度以及如何划分,因此需要传递划分长度size和划分方法Fn。

// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
其中partitionFor()方法返回的是在PCollectionList中的位置下标。


Side Input

不能使用硬编码数据,通常是转换中间产生的数据。一般用于跟主输入数据进行比较,因此要求Side Input数据的窗口要与主输入数据的窗口尽量一致,如果不一致,Beam会尽可能地从Side Input中找到合适的位置的数据进行比较。对于设置了多个触发器的Side Input,自动选择最后一个触发的结算结果。


附属输出数据 Additional Outputs

这一部分官方的代码已经写得很清楚,看代码即可。


数据编码

在Pipeline的数据处理过程中经常需要对数据元素进行字节转换,因此需要制定字节转换的编码格式。对于绝大部分类型的数据,Beam都提供了默认的编码类型,用户也可以通过SetCoder指定编码类型。

1)从内存读取的输入数据一般要求用户指定其编码类型;

2)用户自定义的类对象一般要求用户指定其编码类型,或者可以在类定义上使用@DefaultCoder(AvroCoder.class)指定默认编码类型。


其他:

Beam不是线程安全的,一般建议处理方法是幂等的。