主代码 *App
import ;
import ;
import ;
import ;
/**
* @author zyj
* @Date 2022/1/19 10:08
*/
public class ProvinceStatsSqlApp {
public static void main(String[] args) throws Exception {
// TODO 1.环境准备
// 1.1 流处理环境
StreamExecutionEnvironment env = ();
// 2.2 表执行环境
StreamTableEnvironment tableEnv = (env);
// 2.3 设置并行度
(4);
// TODO 2.设置检查点
//2.1 开启检查点
(5000L, CheckpointingMode.EXACTLY_ONCE);
//2.2 设置检查点超时时间
().setCheckpointTimeout(60000);
//2.3 设置重启策略
((3,3000L));
//2.4 设置job取消后,检查点是否保留
().enableExternalizedCheckpoints(.RETAIN_ON_CANCELLATION);
//2.5 设置状态后端 内存|文件系统|RocksDB
(new FsStateBackend("hdfs:///ck/gmall"));
//2.6 指定操作HDFS的用户
("HADOOP_USER_NAME","zyj");
// TODO 3.从指定的数据源读取数据 转换为动态表
String topic = "dwm_order_wide";
String groupId = "province_stats_app_group";
("CREATE TABLE order_wide (" +
" province_id BIGINT, " +
" province_name STRING," +
" province_area_code STRING," +
" province_iso_code STRING," +
" province_3166_2_code STRING," +
" order_id STRING," +
" split_total_amount DOUBLE," +
" create_time STRING," +
" rowtime as TO_TIMESTAMP(create_time) ," + // 创建水位线固定格式
" WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND " + // 创建水位线固定格式
") WITH (" + (topic, groupId) + ")");
//TODO 4.分组、开窗、聚合计算
Table provinceStatTable = ("select " +
" DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as stt," +
" DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND),'yyyy-MM-dd HH:mm:ss') as edt," +
" province_id," +
" province_name," +
" province_area_code area_code," +
" province_iso_code iso_code ," +
" province_3166_2_code iso_3166_2," +
" count(distinct order_id) order_count," +
" sum(split_total_amount) order_amount," +
" UNIX_TIMESTAMP() * 1000 as ts " +
" from " +
" order_wide " +
" group by " +
" TUMBLE(rowtime, INTERVAL '10' SECOND)," + // 开窗固定格式
" province_id,province_name,province_area_code,province_iso_code,province_3166_2_code");
// TODO 5.将动态表转换为流
DataStream<ProvinceStats> provinceStatsDS = (provinceStatTable, );
(">>>>");
// TODO 6.将流中的数据写到 clickhouse中
(
("insert into province_stats_0224 values(?,?,?,?,?,?,?,?,?,?)")
);
();
}
}
工具类 MyKafkaUtil
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* @author zyj
* @Date 2022/1/6 15:29
* 操作 kafka 的工具类
*/
public class MyKafkaUtil {
private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
private static final String DEFAULT_TOPIC = "default_topic";
// 获取kafka的消费者
public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
Properties props = new Properties();
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
}
// 获取kafka的生产者(String数据类型)
/*
注意:下面这中实现只能保证数据不丢,不能保证精准一次性
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
return new FlinkKafkaProducer<String>(
KAFKA_SERVER,
topic,
new SimpleStringSchema()
);
}*/
public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
Properties props = new Properties();
// 指定kafka服务端ip地址
(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
// 设置生产超时时间
(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");
// 创建kafka生产者,自定义序列化器 KafkaSerializationSchema<String> 这里是指定String类型
return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, ());
}
}, props, .EXACTLY_ONCE);
}
// 获取kafka的生产者(通用数据类型)
public static <T> FlinkKafkaProducer<T> getKafkaSinkSchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties props = new Properties();
// 指定kafka服务端ip地址
(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
// 设置生产超时时间
(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 15 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, .EXACTLY_ONCE);
}
public static String getKafkaDDL(String topic, String groupId) {
String ddl = "'connector' = 'kafka'," +
"'topic' = '" + topic + "'," +
"'' = '" + KAFKA_SERVER + "'," +
"'' = '" + groupId + "'," +
"'' = 'latest-offset'," +
"'format' = 'json'";
return ddl;
}
}
工具类 ClickHouseUtil
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* @author zyj
* @Date 2022/1/18 9:15
*/
public class ClickHouseUtil {
public static <T> SinkFunction<T> getJdbcSink(String sql) {
// insert into visitor_stats_0224 values(?,?,?,?,?,?,?,?,?,?,?,?)
SinkFunction<T> sinkFunction = JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
// 获取流中对象obj的属性值,赋值给问号占位符? 参数 T obj 就是流中的一条数据
@Override
public void accept(PreparedStatement ps, T obj) throws SQLException {
// 获取流中对象所属类的属性
Field[] fields = ().getDeclaredFields();
// 对属性数组进行遍历
int skipNum = 0;
for (int i = 0; i < ; i++) {
// 获取每一个属性
Field field = fields[i];
// 判断该属性是否有@TransientSink注解
TransientSink transientSink = ();
if (transientSink != null) {
skipNum++;
continue;
}
// 设置私有属性的访问权限
(true);
// 获取对象的属性值
try {
Object fieldValue = (obj);
// 将属性的值 赋值给问号占位符
(i + 1 - skipNum, fieldValue);
} catch (IllegalAccessException e) {
();
}
}
}
},
new ()
// 批量插入(5条插一次) 每个并行度上每到5条数据就写出
.withBatchSize(5)
// 插入超时时间
.withBatchIntervalMs(2000)
// 插入失败重试次数
.withMaxRetries(3)
.build(),
new ()
// 驱动
.withDriverName("")
// clickhouse的ip和端口地址
.withUrl(GmallConfig.CLICKHOUSE_URL)
.build()
);
return sinkFunction;
}
}