calcite在flink中的二次开发,介绍解析器与优化器

时间:2024-02-22 22:07:19

calcite 在flink中的二次开发

  • 1 CodeGen
  • 2 flink 语法扩展
    • 2.1 在进行 Rule 规则匹配时,放开对 Distinct 的限制
    • 2.2下面附上一个 利用codegen来生成所需类的例子:
  • 3 flink使用calcite 生成解析器FlinkSqlParserImpl
    • 3.1 FlinkSqlParserImpl 的生成
      • 3.1.1 flink 引入 calcite
      • 3.1.2 fmpp 生成 Parser.jj
      • 3.1.3 javacc 生成 parser
      • 3.1.4 看看 Parser
      • 3.1.5 blink planner 引入 flink-sql-parser
  • 4 calcite 规则优化器
    • 4.1 什么是查询优化器
    • 4.2 基于规则优化(RBO)
    • 4.3 基于成本优化(CBO)
    • 4.4 优化规则
      • 4.4.1 谓词下推(Predicate Pushdown)
      • 4.4.2 常量折叠(Constant Folding)
      • 4.4.3 列裁剪(Column Pruning)
  • 扩展资料

关于calcite的概念相关的内容,在我另一篇帖子
深入理解flinksql执行流程,扩展解析器实现语法的扩展

1 CodeGen

首先阐述一下 codegen:
Codegen是基于ObjectWeb ASM的低开销的java代码生成器,他可以根据预先填好的规则与条件,通过编译代码,自动生成java类
在这里插入图片描述

在递归调用各个节点 DataStreamRel 的 translateToPlan 方法时,会利用CodeGen元编程成Flink的各种算子,就相当于我们直接利用Flink的DataSet或DataStream API开发的程序。
还是以上面的Demo为例,跟踪进 DataStreamScan 的 translateToPlan 方法中,会发现相关逻辑:

  1. 首先生成 function 代码的字符串形式,并封装成 GeneratedFunction 对象;
  2. 然后使用 CodeGen 进行编译;
  3. 在需要使用 Function 的时候使用反射进行加载使用。

后续在 扩展 flink语法(如join维表)时,需要针对上述步骤,拼接生成 function 的字符串形式。
在这里插入图片描述

2 flink 语法扩展

了解完 Flink Sql 的执行流程之后,就可以针对 Flink Sql 做语法、功能上的扩展。

在Flink老版本上,Flink不支持 COUNT(DISTINCT aaa) 语法,但是如果需要对 Flink 做此功能拓展,需要结合 前面说到的 Flink Sql 执行流程,做相应修改。

修改点:

  1. 在进行 Rule 规则匹配时,放开对 Distinct 的限制
  2. DataStreamRelNode 转为 DataStream 过程中,拼接CodeGen所需的 Function String

2.1 在进行 Rule 规则匹配时,放开对 Distinct 的限制

在 DATASTREAM_OPT_RULES.DataStreamGroupWindowAggregateRule 中放开对 Distinct 的限制:
在这里插入图片描述

2.2下面附上一个 利用codegen来生成所需类的例子:

新建一个项目 ,从源码中拷贝出codegen的代码文件夹在这里插入图片描述
在配置文件中,添加好,sql的保留字,关键字,类的名字等信息,这里就不多说了,有需要的同学可以百度具体的原理技术
新建一个SqlUseFunction.java 这个就是上文中说到的function 代码的字符串形式,
在flink中,就是通过 拼装来拼装出一个类,调用codegen来进行编译得到继承抽象类SqlNode 的方法,所以在开发完的源代码中是找不到codegen相关的东西的,但实际他是参与了工作的。

package com;

import org.apache.calcite.sql.*;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;

public class SqlUseFunction extends SqlCall {

    private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE FUNCTION",
            SqlKind.OTHER_FUNCTION);

    private final SqlIdentifier funcName;
    private final SqlNodeList funcProps;

    /**
     * SqlUseFunction constructor.
     *
     * @param pos sql define location
     * @param funcName function name
     * @param funcProps function property
     * */
    public SqlUseFunction(SqlParserPos pos, SqlIdentifier funcName, SqlNodeList funcProps) {
        super(pos);
        this.funcName = funcName;
        this.funcProps = funcProps;
    }

    @Override
    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
        writer.keyword("USE FUNCTION");
        funcName.unparse(writer, leftPrec, rightPrec);
        if (funcProps != null) {
            writer.keyword("WITH");
            SqlWriter.Frame frame = writer.startList("(", ")");
            for (SqlNode c : funcProps) {
                writer.sep(",");
                c.unparse(writer, 0, 0);
            }
            writer.endList(frame);
        }
    }

    @Override
    public SqlOperator getOperator() {
        return OPERATOR;
    }

    @Override
    public List<SqlNode> getOperandList() {
        return ImmutableNullableList.of(funcName, funcProps);
    }

}

pom文件中指定好,使用fmpp技术,以及codegen的地址等

    <build>
        <plugins>
            <!-- adding fmpp code gen -->
            <plugin>
                <artifactId>maven-resources-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-fmpp-resources</id>
                        <phase>initialize</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/codegen</outputDirectory>
                            <resources>
                                <resource>
                                    <directory>src/main/codegen</directory>
                                    <filtering>false</filtering>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <!-- 从calcite-core.jar提取解析器语法模板,并放入在${project.build}freemarker模板所在的目录 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.8</version>
                <executions>
                    <execution>
                        <id>unpack-parser-template</id>
                        <phase>initialize</phase>
                        <goals>
                            <goal>unpack</goal>
                        </goals>
                        <configuration>
                            <artifactItems>
                                <artifactItem>
                                    <groupId>org.apache.calcite</groupId>
                                    <artifactId>calcite-core</artifactId>
                                    <version>1.18.0</version>
                                    <type>jar</type>
                                    <overWrite>true</overWrite>
                                    <outputDirectory>${project.build.directory}/</outputDirectory>
                                    <includes>**/Parser.jj</includes>
                                </artifactItem>
                            </artifactItems>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <configuration>
                    <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
                    <outputDirectory>target/generated-sources</outputDirectory>
                    <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
                </configuration>
                <groupId>com.googlecode.fmpp-maven-plugin</groupId>
                <artifactId>fmpp-maven-plugin</artifactId>
                <version>1.0</version>
                <dependencies>
                    <dependency>
                        <groupId>org.freemarker</groupId>
                        <artifactId>freemarker</artifactId>
                        <version>2.3.28</version>
                    </dependency>
                </dependencies>
                <executions>
                    <execution>
                        <id>generate-fmpp-sources</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>generate</goal>
                        </goals>
                    </execution>
                </executions>

            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>javacc-maven-plugin</artifactId>
                <version>2.4</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <id>javacc</id>
                        <goals>
                            <goal>javacc</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
                            <includes>
                                <include>**/Parser.jj</include>
                            </includes>
                            <lookAhead>2</lookAhead>
                            <isStatic>false</isStatic>
                            <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

这里引入一张图 说明calcite中jacc和fmpp在其中起的作用
在这里插入图片描述
看到codegen中的fmpp了么 就是这个fmpp

3 flink使用calcite 生成解析器FlinkSqlParserImpl

以下面这个案例出发(代码基于 flink 1.13.1 版本):

public class ParserTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(10);

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStream<Tuple3<String, Long, Long>> tuple3DataStream =
                env.fromCollection(Arrays.asList(
                        Tuple3.of("2", 1L, 1627254000000L),
                        Tuple3.of("2", 1L, 1627218000000L + 5000L),
                        Tuple3.of("2", 101L, 1627218000000L + 6000L),
                        Tuple3.of("2", 201L, 1627218000000L + 7000L),
                        Tuple3.of("2", 301L, 1627218000000L + 7000L),
                        Tuple3.of("2", 301L, 1627218000000L + 7000L),
                        Tuple3.of("2", 301L, 1627218000000L + 7000L),
                        Tuple3.of("2", 301L, 1627218000000L + 7000L),
                        Tuple3.of("2", 301L, 1627218000000L + 7000L),
                        Tuple3.of("2", 301L, 1627218000000L + 86400000 + 7000L)))
                        .assignTimestampsAndWatermarks(
                                new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Long, Long> element) {
                                        return element.f2;
                                    }
                                });

        tEnv.registerFunction("mod", new Mod_UDF());

        tEnv.registerFunction("status_mapper", new StatusMapper_UDF());

        tEnv.createTemporaryView("source_db.source_table", tuple3DataStream,
                "status, id, timestamp, rowtime.rowtime");

        String sql = "SELECT\n"
                + "  count(1),\n"
                + "  cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)\n"
                + "FROM\n"
                + "  source_db.source_table\n"
                + "GROUP BY\n"
                + "  tumble(rowtime, INTERVAL '1' DAY)";

        Table result = tEnv.sqlQuery(sql);

        tEnv.toAppendStream(result, Row.class).print();

        env.execute();

    }

}


debug 过程如之前分析 sql -> SqlNode 过程所示,如下图直接定位到 SqlParser:

在这里插入图片描述

如上图可以看到具体的 Parser 就是 FlinkSqlParserImpl。

定位到具体的代码如下图所示(flink-table-palnner-blink-2.11-1.13.1.jar)。

在这里插入图片描述
最终 parse 的结果 SqlNode 如下图。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
再来看看 FlinkSqlParserImpl 是怎么使用 calcite 生成的。
具体到 flink 中的实现,位于源码中的 flink-table.flink-sql-parser 模块(源码基于 flink 1.13.1)。
flink 是依赖 maven 插件实现的上面的整体流程。

3.1 FlinkSqlParserImpl 的生成

在这里插入图片描述
接下来看看整个 Parser 生成流程。

3.1.1 flink 引入 calcite

使用 maven-dependency-plugin 将 calcite 解压到 flink 项目 build 目录下。

<plugin>
    <!-- Extract parser grammar template from calcite-core.jar and put
         it under ${project.build.directory} where all freemarker templates are. -->
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <executions>
        <execution>
            <id>unpack-parser-template</id>
            <phase>initialize</phase>
            <goals>
                <goal>unpack</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <artifactItem>
                        <groupId>org.apache.calcite</groupId>
                        <artifactId>calcite-core</artifactId>
                        <type>jar</type>
                        <overWrite>true</overWrite>
                        <outputDirectory>${project.build.directory}/</outputDirectory>
                        <includes>**/Parser.jj</includes>
                    </artifactItem>
                </artifactItems>
            </configuration>
        </execution>
    </executions>
</plugin>


在这里插入图片描述

3.1.2 fmpp 生成 Parser.jj

使用 maven-resources-plugin 将 Parser.jj 代码生成。

<plugin>
    <artifactId>maven-resources-plugin</artifactId>
    <executions>
        <execution>
            <id>copy-fmpp-resources</id>