Flink资料(6) -- 如何添加一个新的Operator

时间:2022-09-04 21:03:36

false

false
false
false

EN-US
ZH-CN
X-NONE


/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.0pt;
font-family:"Times New Roman",serif;}

如何添加一个新的Operator

翻译自How to add a new Operator

---------------------------------

Java API中可以通过多种途径添加Operator

1.    在DataSet上,以已存在的Operator为基础,组合或具现化(speciallzation)而形成新的Operator

2.    设计新的自定义扩展Operator(custom extension operator)

3.    设计新的运行时Operator(runtime operator)

前两种方法实现起来较为容易且轻量级。而对于运行时Operator,有时新的设计的确需要新的运行时Operator,或者使用运行时Operator会更加高效

一、在DataSet上实现一个新的Operator

许多Operator可以通过具现化另一个Operator或是UDF两种方式实现。

最简单的例子有DataSet上诸如sum(), min(), max()等方法。这些方法仅简单地用一些预先定义的参数调用其他Operator:

 public AggregateOperator<T> sum (int field) {
  return this.aggregate (Aggregations.SUM, field);
}

一些Operator可以通过多个其他Operator的组合实现,如通过map和aggregate的组合来实现count()方法。实现此功能更简单的方法是在DataSet上定义一个方法,按需调用map和reduce

 public DataSet<Long> count() {
  return this.map(new MapFunction<T, Long>() {
    public Long map(T value) {
      return 1L;
    }
  })
  .reduce(new ReduceFunction<Long>() {
    public Long reduce(Long val1, Long val1) {
      return val1 + val2;
    }
  });
}

如果我们定义一个新的Operator的同时,不想修改DataSet类,可以以静态方法的形式定义在另一个类中,此时,count()
Operator则如下所示:

 public static <T>DataSet<Long> count(DataSet<T> data) {
  return data.map(...).reduce(...);
}

1.1 更加复杂的Operator

通过具现化实现的更加复杂的例子是Java API中的Aggregation Operation,它通过GroupReduce
UDF的方法实现。

Aggregate Operation从其自己的Operator演化而来,将自己转换为Common API的GroupReduceOperatorBase。Java API的aggregation
Operator仅是一个接收聚合类型(aggregation
type)和成员位置(field
position)的构建者,它将这些信息作为参数给GroupReduce
UDF,由GroupReduce
UDF来进行聚合操作

由于操作被转换成一个 GroupReduce操作,它将在优化器和运行时环境中作为一个GroupReduceOperator出现。

二、实现一个自定义扩展Operator

DataSet提供了一个自定义Operator的方法:

DataSet<X>
runOperation(CustomUnaryOperation<T, X> operation)。接口CustomUnaryOperation通过两个方法来定义Operator。

 void setInput(DataSet<IN> inputData);

 DataSet<OUT> createResult();

VertexCentricIteration Operator即通过这种方式实现,下面是一个以此种方式实现count() Operator的例子:

 public class Counter<T> implements CustomUnaryOperation<T, Long> {
  private DataSet<T> input;   public void setInput(DataSet<IN> inputData) { this.input = inputData; }   public DataSet<Long> createResult() {
    return input.map(...).reduce(...);
  }
}

该Operator调用方式如下:

 DataSet<String> lines = ...;
DataSet<Long> count = lines.runOperation(new Counter<String>());

三、实现一个新的运行时Operator

添加一个新的Runtime Operator需要对整个技术栈做出修改,从API到运行时:

1.    Java API

2.    Common API

3.    Optimizer

4.    Runtime

我们将自底向上描述,以方法mapPartition()为例(类似map方法,只不过每个并行分区仅调用一次)

1. 运行时(RunTime)

Runtime Operator使用接口Drive实现,该接口定义了描述运行时中Operator的方法。MapDriver便是那些Operator如何工作的简单例子。

与运行时一同运行的还有MutableObjectIterator,它描述了可以重用对象的数据流,以达到减少垃圾回收的压力的目的。

mapPartition Operator的核心方法run()可能具有以下形式:

 public void run() throws Exception {
  final MutableObjectIterator<IN> input = this.taskContext.getInput(0);
  final MapPartitionFunction<IN, OUT> function = this.taskContext.getStub();
  final Collector<OUT> output = this.taskContext.getOutputCollector();
  final TypeSerializer<IN> serializer = this.taskContext.getInputSerializer(0);
  // we assume that the UDF takes a java.util.Iterator, so we wrap the MutableObjectIterator
  Iterator<IN> iterator = new MutableToRegularIteratorWrapper(input, serializer);   function.mapPartition(iterator, output);
}

为了提高运行效率,以链式(chained
version)实现一个Operator总是有好处的。链接在一起的Operator作为它们的前驱Operator,在同一个线程下运行,并且可以使用嵌套循环地调用。这会省去许多序列化/反序列化的开销,从而大大增加效率。

我们可以通过MapDriver(正常的)和ChainedMapDriver(链式变种)来学习如何实现链式Operator

2. 优化器/编译器

该部分简单讨论了添加Operator的重要步骤,有关优化器的工作原理见Optimizer。为了使优化器将新的Operator纳入其优化方案,我们需要向它提供一些信息,如下所示:

1.    DriverStrategy:要使得优化器可以访问到新的Operation,新加的Operation需要加入枚举类。枚举类入口(entry)参数定义了什么类实现了runtime operator,它的链接的版本是什么,Operator是否需要累积数据(即需要内存),以及它是否需要Comparator(用于key)。在我们的例子中,我们可以添加~~~java MAP_PARTITION(MAPPartitionDriver.class, null/*或链接的版本*/, PIPELINED, false); ~~~

2.    Cost
function::类CostEstimator需要Operation对系统的开销的信息。这里的“开销”是指Operator的non-UDF的部分。由于我们的Operator本质上并没有这部分工作(直接将数据流传递给UDF),则此开销为0。我们通过向costOperator(…)中的switch语句中添加常量MAP_PARTITION,类似MAP常量,以标识该操作没有开销。

3.    OperatorDescription:Operator的描述类定义了优化器如何处理一个Operation。它描述了Operation需要什么样的输入数据(如有序的、分区的),和允许的优化器以全局方法来优化数据操作(data
movement)、排序、分组的方法。为了描述上述信息,我们需要描述Operator拥有什么RequestedGlobalProperties(分区操作、拷贝操作)和RequestLocalProperties(排序、分组、单一提取(uniqueness)),以及Operator如何影响已存在的GlobalPropertiesLocalProperties。此外,该OperatorDescription还定义了一些支持方法,例如实例化一个候选Operator(Operator
candidate)的方法等。由于mapPartition()的功能非常简单(无需分区/分组等),它的描述类也十分简单,其他Operator则具有更加复杂的需求,如Hash Join 1Hash Join 2SortMerge Join。下面的示例代码解释了如何为MapPartitionOperator创建描述类:

 public DriverStrategy getStrategy() {
  return MAP_PARTITION;
}
// Instantiate the operator with the strategy over the input given in the form of the Channel
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
  return new SingleInputPlanNode(node, "MapPartition", in, MAP_PARTITION);
} // The operation accepts data with default global properties (arbitrary distribution)
protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
  return Collections.singletonList(new RequestedGlobalProperties());
} // The operation can accept data with any local properties. No grouping/sorting is necessary
protected List<RequestedLocalProperties> createPossibleLocalProperties() {
  return Collections.singletonList(new RequestedLocalProperties());
} // the operation itself does not affect the existing global properties.
// The effect of the UDF's semantics// are evaluated separately (by interpreting the
// semantic assertions)
public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
  return gProps;
} // since the operation can mess up all order, grouping, uniqueness, we cannot make any statements
// about how local properties are preserved
public LocalProperties computeLocalProperties(LocalProperties lProps) {
  return LocalProperties.EMPTY;
}

4.    OptimizerNode:优化器节点控制着所有该方面工作,它创建了OperatorDescriptor的列表,实现了结果数据集规模的估计,并且给Operator赋予其名字。此外,它相对来说是一个较小的类,故而可以从MapNode重新拷贝

3. Common API

为了使得Operation可以用于更高级的API,需要将它添加到Common API中去。最简单的方法就是添加一个base
operator。我们以类MapOperatorBase为模板,创建类MapPartitionOperatorBase。

此外,优化器需要清楚OptimizerNode如何从OperatorBase创建一个OptimizerNode,该功能在Optimizer中的调用GraphCreatingVisitor类实现。

注意:我们仍在考虑通过统一OptimizerNode和Common API
Operator来跳过这一步骤,因为它们本质上实现的是同一个功能。Common API
Operator存在的原因仅仅是使得flink-java和flink-scala包不依赖与Optimizer。

4. Java API

创建一个Java API的方式与MapOperator的方式是一样的,其中核心方法就是translateToDataFlow(…)方法,它为Java API
operator创建了Common API
operator。

最后一步就是向类DataSet添加相关方法

 public <R> DataSet<R> mapPartition(MapPartitionFunction<T, R> function) {
  return new MapPartitionOperator<T, R>(this, function);
}

Flink资料(6) -- 如何添加一个新的Operator的更多相关文章

  1. 012&period;Adding a New Field --【添加一个新字段】

    Adding a New Field 添加一个新字段 2016-10-14 3 分钟阅读时长 作者 By Rick Anderson In this section you'll use Entity ...

  2. Linux 在添加一个新账号后却没有权限怎么办

    当添加一个新账号后,我们可能会发现新账号sudo 时会报告不在sudoers中,使用su -s时输入密码后也会认证失败 上网搜索大部分都要求修改/etc/sudoers中的内容,但修改这个文件必须需要 ...

  3. Mysql学习&lpar;一&rpar;添加一个新的用户并用golang操作Mysql

    Mysql添加一个新的用户并赋予权限 添加一个自己的用户到mysql 首先我们需要先用root用户登录mysql,但是刚安装完没有密码,我们先跳过密码 ailumiyana@ailumiyana:~/ ...

  4. RK平台Android4&period;4 添加一个新的遥控器支持以及添加特殊按键【转】

    本文转载自:http://blog.csdn.net/coding__madman/article/details/52904063 版权声明:本文为博主原创文章,未经博主允许不得转载. 瑞芯微平台 ...

  5. 【IntelliJ IDEA】添加一个新的tomcat,tomcat启动无法访问欢迎页面,空白页,404

    ===================================第一部分,添加一个tomcat================================================== ...

  6. Android4&period;0 添加一个新的Android 键值

    这里添加新的键值,不是毫无凭据凭空创造的一个键值,而是根据kernel中检测到的按键值,然后转化为Android所需要的数值: 以添加一个Linux键值为217,把它映射为android的键值Brow ...

  7. 【转】windows7的桌面右键菜单的&OpenCurlyDoubleQuote;新建”子菜单,在注册表哪个位置,如何在&OpenCurlyDoubleQuote;新建&quot&semi;里面添加一个新项

    点击桌面,就会弹出菜单,然后在“新建”中就又弹出可以新建的子菜单栏.office与txt 的新建都是在这里面的.我想做的事情是:在右键菜单的“新建” 中添加一个“TQ文本”的新建项,然后点击它之后,桌 ...

  8. linux采用模块方法,添加一个新的设备

    该文转载自:http://rangercyh.blog.51cto.com/1444712/521244 系统调用是操作系统内核和应用程序之间的接口,而设备驱动程序是操作系统内核和机器硬件之间的接口. ...

  9. 向Dialog中添加一个新的Menu

    1.创建一个新的Menu,在资源管理视图中,右键Menu-->传入Menu 2.设计新Menu,ID为IDR_MENU1 3.在该Dialog的源文件中,找到CTest001Dlg::OnIni ...

随机推荐

  1. adobe premiere pro cc2015&period;0已停止工作 解决办法

    adobe premiere pro cc2015.0已停止工作 一直报错 解决办法就是: 删除我的电脑  我的饿文档下的 Adobe下的Premiere Pro文件夹 现象就是怎么重新安装都不管用P ...

  2. Unity Standard Assets 简介之 Vehicles

    这篇介绍载具资源包Vehicles. 主要包含Aircraft(飞行器)和Car(车辆)两部分,两个文件夹里分别有AircraftGuidelines.txt和CarGuidelines.txt对相关 ...

  3. Vs2012出现停止工作问题的解决方法

    我的VS2012总是出现问题,打开项目会,更改移动控件位置也会,后来在网上找到了解决方法 这是出现问题

  4. 一行命令实现Android自动关机

    前几天晚上失眠,实在睡不着觉,于是想用Nexus7听一听小野丽莎的歌,在安静祥和之中睡去(怎么感觉有点...)但是不能让平板总是这么循环播放吧(屋里吐槽Google Play Music),所以在平板 ...

  5. 懂,你的App生,不懂,死!

    近期有一些开发人员.创业公司的人加我微信viyi88,咨询一些关于自己App的事情.被问得最多的可能就是:"我的App怎样推广添加下载量?"而且信誓旦旦地说自己的App做得非常好, ...

  6. C&plus;&plus;中引用

    在C语言中&这个符号表示了取地址符,但是在C++中它却有着不同的用途,掌握C++的&符号,是提高代码执行效率和增强代码质量的一个很好的办法.一.引用简介 引用就是某一变量(目标)的一个 ...

  7. 论事件驱动与异步IO

    通常我们写服务器模型,有以下几种模型: 每收到一个请求,创建一个新的进程,来处理该请求 每收到一个请求,创建一个新的线程,来处理该请求 每收到一个请求,放入到一个事件中,让主程序通过非阻塞I/0方式来 ...

  8. js面向对象的理解

    ECMAScript 有两种开发模式:1.函数式(过程化),2.面向对象(OOP).面向对象的语言有一个标志,那就是类的概念,而通过类可以创建任意多个具有相同属性和方法的对象.但是,ECMAScrip ...

  9. 库增删该查,表增删该查,记录增删该查,表与表关系&lpar;多对多,多对一,一对一&rpar;,mysql用户管理

    库增删该查 增加库 create database db1 create database db1 charset="gbk 查看库 show databases 查看所有库 show cr ...

  10. pooling的几种形式&lpar;转&rpar;

    转载地址:http://blog.csdn.net/malefactor/article/details/51078135    原作者:张俊林 CNN是目前自然语言处理中和RNN并驾齐驱的两种最常见 ...