Flink-SQL 设置水位线(Watermark)和开窗(window)示例

时间:2025-04-20 18:25:04

主代码 *App

import ;
import ;
import ;
import ;

/**
 * @author zyj
 * @Date 2022/1/19 10:08
 */
public class ProvinceStatsSqlApp {
    public static void main(String[] args) throws Exception {
        // TODO 1.环境准备
        // 1.1 流处理环境
        StreamExecutionEnvironment env = ();
        // 2.2 表执行环境
        StreamTableEnvironment tableEnv = (env);
        // 2.3 设置并行度
        (4);

        // TODO 2.设置检查点
        //2.1 开启检查点
        (5000L, CheckpointingMode.EXACTLY_ONCE);
        //2.2 设置检查点超时时间
        ().setCheckpointTimeout(60000);
        //2.3 设置重启策略
        ((3,3000L));
        //2.4 设置job取消后,检查点是否保留
        ().enableExternalizedCheckpoints(.RETAIN_ON_CANCELLATION);
        //2.5 设置状态后端   内存|文件系统|RocksDB
        (new FsStateBackend("hdfs:///ck/gmall"));
        //2.6 指定操作HDFS的用户
        ("HADOOP_USER_NAME","zyj");

        // TODO 3.从指定的数据源读取数据  转换为动态表
        String topic = "dwm_order_wide";
        String groupId = "province_stats_app_group";

        ("CREATE TABLE order_wide (" +
                " province_id BIGINT, " +
                " province_name STRING," +
                " province_area_code STRING," +
                " province_iso_code STRING," +
                " province_3166_2_code STRING," +
                " order_id STRING," +
                " split_total_amount DOUBLE," +
                " create_time STRING," +
                " rowtime as TO_TIMESTAMP(create_time) ," +                     // 创建水位线固定格式
                " WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND " +    // 创建水位线固定格式
                ") WITH (" + (topic, groupId) + ")");

        //TODO 4.分组、开窗、聚合计算
        Table provinceStatTable = ("select " +
                " DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as stt," +
                " DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as edt," +
                " province_id," +
                " province_name," +
                " province_area_code area_code," +
                " province_iso_code iso_code ," +
                " province_3166_2_code iso_3166_2," +
                " count(distinct order_id) order_count," +
                " sum(split_total_amount) order_amount," +
                " UNIX_TIMESTAMP() * 1000 as ts " +

                " from " +
                " order_wide " +
                " group by " +
                " TUMBLE(rowtime, INTERVAL '10' SECOND)," +    // 开窗固定格式
                " province_id,province_name,province_area_code,province_iso_code,province_3166_2_code");

        // TODO 5.将动态表转换为流
        DataStream<ProvinceStats> provinceStatsDS = (provinceStatTable, );

        (">>>>");

        // TODO 6.将流中的数据写到 clickhouse中
        (
                ("insert into  province_stats_0224  values(?,?,?,?,?,?,?,?,?,?)")
        );

        ();
    }
}

工具类 MyKafkaUtil 

import ;
import ;
import ;
import ;
import ;
import ;
import ;

import ;
import ;

/**
 * @author zyj
 * @Date 2022/1/6 15:29
 * 操作 kafka 的工具类
 */
public class MyKafkaUtil {
    private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
    private static final String DEFAULT_TOPIC = "default_topic";

    // 获取kafka的消费者
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
        Properties props = new Properties();
        (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
        (ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
    }

    // 获取kafka的生产者(String数据类型)
   /*
   注意:下面这中实现只能保证数据不丢,不能保证精准一次性
   public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
        return new FlinkKafkaProducer<String>(
                KAFKA_SERVER,
                topic,
                new SimpleStringSchema()
        );
    }*/
    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
        Properties props = new Properties();
        // 指定kafka服务端ip地址
        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
        // 设置生产超时时间
        (ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");

        // 创建kafka生产者,自定义序列化器 KafkaSerializationSchema<String> 这里是指定String类型
        return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
                return new ProducerRecord<byte[], byte[]>(topic, ());
            }
        }, props, .EXACTLY_ONCE);
    }

    // 获取kafka的生产者(通用数据类型)
    public static <T> FlinkKafkaProducer<T> getKafkaSinkSchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties props = new Properties();
        // 指定kafka服务端ip地址
        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
        // 设置生产超时时间
        (ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");

        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, .EXACTLY_ONCE);
    }

    public static String getKafkaDDL(String topic, String groupId) {
        String ddl = "'connector' = 'kafka'," +
                "'topic' = '" + topic + "'," +
                "'' = '" + KAFKA_SERVER + "'," +
                "'' = '" + groupId + "'," +
                "'' = 'latest-offset'," +
                "'format' = 'json'";
        return ddl;
    }
}

工具类  ClickHouseUtil

import ;
import ;
import ;
import ;
import ;

import ;
import ;
import ;

/**
 * @author zyj
 * @Date 2022/1/18 9:15
 */
public class ClickHouseUtil {
    public static <T> SinkFunction<T> getJdbcSink(String sql) {
        // insert into visitor_stats_0224 values(?,?,?,?,?,?,?,?,?,?,?,?)
        SinkFunction<T> sinkFunction = JdbcSink.<T>sink(
                sql,
                new JdbcStatementBuilder<T>() {
                    // 获取流中对象obj的属性值,赋值给问号占位符?  参数 T obj  就是流中的一条数据
                    @Override
                    public void accept(PreparedStatement ps, T obj) throws SQLException {
                        // 获取流中对象所属类的属性
                        Field[] fields = ().getDeclaredFields();
                        // 对属性数组进行遍历
                        int skipNum = 0;
                        for (int i = 0; i < ; i++) {
                            // 获取每一个属性
                            Field field = fields[i];

                            // 判断该属性是否有@TransientSink注解
                            TransientSink transientSink = ();
                            if (transientSink != null) {
                                skipNum++;
                                continue;
                            }

                            // 设置私有属性的访问权限
                            (true);

                            // 获取对象的属性值
                            try {
                                Object fieldValue = (obj);
                                // 将属性的值  赋值给问号占位符
                                (i + 1 - skipNum, fieldValue);
                            } catch (IllegalAccessException e) {
                                ();
                            }
                        }

                    }
                },
                new ()
                        // 批量插入(5条插一次)  每个并行度上每到5条数据就写出
                        .withBatchSize(5)
                        // 插入超时时间
                        .withBatchIntervalMs(2000)
                        // 插入失败重试次数
                        .withMaxRetries(3)
                        .build(),
                new ()
                        // 驱动
                        .withDriverName("")
                        // clickhouse的ip和端口地址
                        .withUrl(GmallConfig.CLICKHOUSE_URL)
                        .build()
        );
        return sinkFunction;
    }
}