大数据之实时数仓建设(二)

时间:2025-04-24 09:34:50

如果数据量大,维度多,用keyBy并不方便,建议写到外部实时数仓里,Clickhouse擅长实时查询,flink擅长实时处理。

一、多维度复杂统计(使用Clickhouse)

使用是clickhouse的ReplacingMergeTree,可以将同一个分区中,ID相同的数据进行merge,可以保留最新的数据,可以使用这个特点实现Flink + Clickhouse(勉强)实现数据一致性。

存在的问题:写入到clickhouse中的数据不能立即merge,需要手动optimize或后台自动合并。

解决方案:查询时在表名的后面加上final关键字,就只查最新的数据数据,但是效率变低了。

如何设计clickhouse的表?

1.可以支持维度查询(大宽表)
2.按照时间段进行查询(将时间作为表的字段并且建分区表)
3.可以统计出PV、UV(去重查询)
4.支持分区(按照时间进行分区)
5.支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6.如果生成一个唯一的ID (在Kafka中生成唯一的ID,topic+分区+偏移量)
7.相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)

1、建表

CREATE TABLE tb_user_event
(
    `id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量', 
    `deviceId` String comment '设备ID',
    `eventId` String comment '事件ID',
    `isNew` UInt8 comment '是否是新用户1为新,0为老',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMMDD格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;

2、自定义Kafka反序列器生成唯一ID

MyKafkaStringDeserializationSchema

package cn._51doit.kafka;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;

/**
 * 可以读取kafka数据中的topic、partition、offset
 */
public class MyKafkaStringDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {


    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        String id = topic + "-" + partition + "-" + offset;
        String value = new String(record.value(), StandardCharsets.UTF_8);
        return Tuple2.of(id, value);
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {});
    }
}

FlinkUtils新增createKafkaStreamV2

package cn._51doit.utils;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class FlinkUtils {

    public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static ParameterTool parameterTool;

    public static <T> DataStream<T> createKafkaStream(String[] args, Class<? extends DeserializationSchema<T>> deserializer) throws Exception {

        parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        long checkpointInterval = parameterTool.getLong("", 30000L);
        String checkpointPath = parameterTool.getRequired("");
        env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);

        env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        List<String> topics = Arrays.asList(parameterTool.getRequired("").split(","));

        Properties properties = parameterTool.getProperties();

        //从Kafka中读取数据

        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);

        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

        return env.addSource(kafkaConsumer);

    }


    /**
     * 可以读取Kafka消费数据中的topic、partition、offset
     * @param args
     * @param deserializer
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T> DataStream<T> createKafkaStreamV2(String[] args, Class<? extends KafkaDeserializationSchema<T>> deserializer) throws Exception {

        parameterTool = ParameterTool.fromPropertiesFile(args[0]);

        long checkpointInterval = parameterTool.getLong("", 30000L);
        String checkpointPath = parameterTool.getRequired("");
        env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
        //(new FsStateBackend(checkpointPath));
        env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        List<String> topics = Arrays.asList(parameterTool.getRequired("").split(","));

        Properties properties = parameterTool.getProperties();

        //从Kafka中读取数据

        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);

        kafkaConsumer.setCommitOffsetsOnCheckpoints(false);

        return env.addSource(kafkaConsumer);

    }
}

调用TestKafkaId

package cn._51doit.test;

import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;

public class TestKafkaId {


    public static void main(String[] args) throws Exception {

        DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        kafkaStream.print();

        FlinkUtils.env.execute();

    }
}

3、使用JdbcSink将数据写入clickhouse

导入clickhouse驱动

<!-- flink sink 的 jdbc标准API -->
<dependency>
    <groupId></groupId>
    <artifactId>flink-connector-jdbc_${}</artifactId>
    <version>${}</version>
</dependency>

<!--clickhouse的连接jdbc驱动 -->
<dependency>
    <groupId></groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.2.4</version>
</dependency>
package cn._51doit.jobs;

import cn._51doit.constant.EventID;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.IsNewUserFunctionV2;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 优化:按照deviceID进行KeyBy,使用OperatorState,虽然已过分区中有多个组,但是在一个分区中使用一个布隆过滤器
 */
public class DataToClickhouse {

    public static void main(String[] args) throws Exception {

        DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        //解析数据
        SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFuncV2());

        beanStream.map(new MapFunction<DataBean, DataBean>() {

            private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");

            @Override
            public DataBean map(DataBean bean) throws Exception {
                Long timestamp = bean.getTimestamp();
                String format = dateFormat.format(new Date(timestamp));
                String[] fields = format.split("-");
                bean.setDate(fields[0]);
                bean.setHour(fields[1]);
                return bean;
            }
        }).addSink(JdbcSink.sink(
                "insert into tb_user_event2 values (?,?,?,?,?,?,?,?,?,?,?,?)",
                (ps, bean) -> {
                    ps.setString(1, bean.getId());
                    ps.setString(2, bean.getDeviceId());
                    ps.setString(3, bean.getEventId());
                    ps.setInt(4, bean.getIsN());
                    ps.setString(5, bean.getProvince());
                    ps.setString(6, bean.getOsName());
                    ps.setString(7, bean.getReleaseChannel());
                    ps.setString(8, bean.getDeviceType());
                    ps.setLong(9, bean.getTimestamp());
                    ps.setString(10, bean.getDate());
                    ps.setString(11, bean.getHour());
                    ps.setString(12, "2021-03-18 00:00:00");
                },
                JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt(""))
                        .withBatchIntervalMs(FlinkUtils.parameterTool.getInt(""))
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(FlinkUtils.parameterTool.getRequired(""))
                        .withDriverName("")
                        .build()));

        FlinkUtils.env.execute();

    }
}

JsonToBeanFuncV2

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 将JSON字符串转成JavaBean
 */
public class JsonToBeanFuncV2 extends ProcessFunction<Tuple2<String, String>, DataBean> {

    @Override
    public void processElement(Tuple2<String, String> tp, Context ctx, Collector<DataBean> out) throws Exception {

        try {
            String id = tp.f0; //数据唯一ID,topic-partition-offset
            DataBean dataBean = JSON.parseObject(tp.f1, DataBean.class);
            dataBean.setId(id);
            out.collect(dataBean);
        } catch (Exception e) {
            //();
            //TODO 将有问题的数据保存起来
        }

    }
}

二、观看直播人数统计需求

a)实时统计累计观众(实时展示,直接用flink keyBy统计)
b)实时统计各个直播间在线人数(实时展示,直接用flink keyBy统计)
c)查看多个维度的明细(将数据写入到clickhouse中)

1、观看直播人数统计实现

实现方式一:
a)将数据来一条就写入到Redis/MySQL或大屏展示(延迟低、但是效率低、对数据库压力大)
b)再写一个job将各种明细数据写入到ClickHouse中(提交了2个job、数据重复计算)

package cn._51doit.jobs;

import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class LiveAudienceCountV2 {
    public static void main(String[] args) throws Exception {

        DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);

        SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFunc());

        SingleOutputStreamOperator<DataBean> liveDataStream = beanStream.filter(new FilterFunction<DataBean>() {
            @Override
            public boolean filter(DataBean bean) throws Exception {
                return bean.getEventId().startsWith("live");
            }
        });

        //按照进入直播间的事件ID进行过滤
        SingleOutputStreamOperator<DataBean> enterStream = liveDataStream.filter(new FilterFunction<DataBean>() {
            @Override
            public boolean filter(DataBean value) throws Exception {
                return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
            }
        });
        //统计各个主播的累计观看人数
        //按照主播ID(按照直播间)
        //统计各个主播直播间实时在线人数
        KeyedStream<DataBean, String> keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());

        SingleOutputStreamOperator<Tuple4<String, Integer, Integer, Integer>> res = keyedEnterStream.process(new AnchorDistinctTotalAudienceFunc());

        res.print();

        FlinkUtils.env.execute();

    }

}

AnchorDistinctTotalAudienceFunc

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 实时统计各个直播间主播的观看人数和次数
 */
public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction<String, DataBean, Tuple4<String, Integer, Integer, Integer>> {

    private transient ValueState<Integer> uvState;
    private transient ValueState<Integer> pvState;
    private transient ValueState<BloomFilter<String>> bloomFilterState;
    private transient ValueState<Integer> onLineUserState;

    @Override
    public void open(Configuration parameters) throws Exception {

        //设置状态的TTL
        StateTtlConfig stateTtlConfig = StateTtlConfig
                .newBuilder(Time.hours(6))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .neverReturnExpired()
                .build(AnchorDistinctTotalAudienceFunc);

        ValueStateDescriptor<Integer> uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        uvStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class);
        pvStateDescriptor.enableTimeToLive(stateTtlConfig);
        ValueStateDescriptor<BloomFilter<String>> bloomFilterStateDescriptor =
                new ValueStateDescriptor<BloomFilter<String>>("bloom-filter-state", TypeInformation.of(new TypeHint<BloomFilter<String>>() {
                }));
        bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor<Integer> onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig);

        uvState = getRuntimeContext().getState(uvStateDescriptor);
        pvState = getRuntimeContext().getState(pvStateDescriptor);
        bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor);
        onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor);

    }

    @Override
    public void processElement(DataBean bean, Context ctx, Collector<Tuple4<String, Integer, Integer, Integer>> out) throws Exception {

        Integer onLineUserCount = onLineUserState.value();
        String deviceId = bean.getDeviceId();
        Integer uv = uvState.value();
        Integer pv = pvState.value();
        BloomFilter<String> bloomFilter = bloomFilterState.value();
        String eventId = bean.getEventId();
        if(onLineUserCount ==null) {
            onLineUserCount = 0;
        }
        if ("liveEnter".equals(eventId)) {
            if (bloomFilter == null) {
                bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
                pv = 0;
                uv = 0;
            }
            if (!bloomFilter.mightContain(deviceId)) {
                bloomFilter.put(deviceId);
                uv++;
                bloomFilterState.update(bloomFilter);
                uvState.update(uv);
            }
            pv++;
            pvState.update(pv);

            //累计在线人数
            onLineUserCount++;
            onLineUserState.update(onLineUserCount);
        } else {
            onLineUserCount--;
            onLineUserState.update(onLineUserCount);
        }
        out.collect(Tuple4.of(ctx.getCurrentKey(), uv, pv, onLineUserCount));
    }
}

实现方式二:
将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/MySQL(延迟高、效率高、对数据库的压力小)
在同一个job中,将数据写入到Clickhouse中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出,可以节省集群资源。避免数据重复读取和计算)

package cn._51doit.jobs;

import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

import java.util.Optional;

public class LiveAudienceCountV2 {
    public static void main(String[] args) throws Exception {

        DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFuncV2());

        SingleOutputStreamOperator<DataBean> liveDataStream = beanStream.filter(new FilterFunction<DataBean>() {
            @Override
            public boolean filter(DataBean bean) throws Exception {
                return bean.getEventId().startsWith("live");
            }
        });

        //按照进入直播间的事件ID进行过滤
        SingleOutputStreamOperator<DataBean> enterStream = liveDataStream.filter(new FilterFunction<DataBean>() {
            @Override
            public boolean filter(DataBean value) throws Exception {
                return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
            }
        });
        //统计各个主播的累计观看人数
        //按照主播ID(按照直播间)
        //统计各个主播直播间实时在线人数
        KeyedStream<DataBean, String> keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());

        AnchorDistinctTotalAudienceFunc anchorFunc = new AnchorDistinctTotalAudienceFunc();

        //主流写入clickhouse
        SingleOutputStreamOperator<DataBean> mainStream = keyedEnterStream.process(anchorFunc);

        //非主流(聚合的数据),写入redis
        DataStream<Tuple4<String, Integer, Integer, Integer>> aggDataStream = mainStream.getSideOutput(anchorFunc.getAggTag());


        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPassword("123456").setDatabase(5).build();

        aggDataStream.addSink(new RedisSink<>(conf, new AudienceCountMapper()));

        mainStream.addSink(JdbcSink.sink(
                "insert into tb_anchor_audience_count (id, anchorId, deviceId, eventId, province, os, channel, deviceType, eventTime, date, hour) values (?,?,?,?,?,?,?,?,?,?,?)",
                (ps, bean) -> {
                    ps.setString(1, bean.getId());
                    ps.setString(2, bean.getProperties().get("anchor_id").toString());
                    ps.setString(3, bean.getDeviceId());
                    ps.setString(4, bean.getEventId());
                    ps.setString(5, bean.getProvince());
                    ps.setString(6, bean.getOsName());
                    ps.setString(7, bean.getReleaseChannel());
                    ps.setString(8, bean.getDeviceType());
                    ps.setLong(9, bean.getTimestamp());
                    ps.setString(10, bean.getDate());
                    ps.setString(11, bean.getHour());
                },
                JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt(""))
                        .withBatchIntervalMs(FlinkUtils.parameterTool.getInt(""))
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(FlinkUtils.parameterTool.getRequired(""))
                        .withDriverName("")
                        .build()));



        FlinkUtils.env.execute();

    }


    public static class AudienceCountMapper implements RedisMapper<Tuple4<String, Integer, Integer, Integer>> {


        //WORD_COUNT -> {(spark,5), (flink,6)}
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "audience-count");
        }

        @Override
        public String getKeyFromData(Tuple4<String, Integer, Integer, Integer> tp) {
            return tp.f0; //aid_20210321
        }

        @Override
        public String getValueFromData(Tuple4<String, Integer, Integer, Integer> tp) {
            return tp.f1 + "," + tp.f2 + "," + tp.f3;
        }
    }


}

AnchorDistinctTotalAudienceFunc

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;

/**
 * 实时统计各个直播间主播的观看人数和次数
 */
public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction<String, DataBean, DataBean> {

    private transient ValueState<Integer> uvState;
    private transient ValueState<Integer> pvState;
    private transient ValueState<BloomFilter<String>> bloomFilterState;
    private transient ValueState<Integer> onLineUserState;

    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");

    private OutputTag<Tuple4<String, Integer, Integer, Integer>> aggTag = new OutputTag<Tuple4<String, Integer, Integer, Integer>>("agg-tag"){};

    @Override
    public void open(Configuration parameters) throws Exception {

        //设置状态的TTL
        StateTtlConfig stateTtlConfig = StateTtlConfig
                .newBuilder(Time.hours(6))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .neverReturnExpired()
                .build();

        ValueStateDescriptor<Integer> uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        uvStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class);
        pvStateDescriptor.enableTimeToLive(stateTtlConfig);
        ValueStateDescriptor<BloomFilter<String>> bloomFilterStateDescriptor =
                new ValueStateDescriptor<BloomFilter<String>>("bloom-filter-state", TypeInformation.of(new TypeHint<BloomFilter<String>>() {
                }));
        bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig);

        ValueStateDescriptor<Integer> onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
        onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig);

        uvState = getRuntimeContext().getState(uvStateDescriptor);
        pvState = getRuntimeContext().getState(pvStateDescriptor);
        bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor);
        onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor);

    }


    @Override
    public void processElement(DataBean bean, Context ctx, Collector<DataBean> out) throws Exception {

        Integer onLineUserCount = onLineUserState.value();
        String deviceId = bean.getDeviceId();
        Integer uv = uvState.value();
        Integer pv = pvState.value();
        BloomFilter<String> bloomFilter = bloomFilterState.value();
        String eventId = bean.getEventId();

        //注册定时器
        long currentProcessingTime = ctx.timerService().currentProcessingTime();
        long fireTime = currentProcessingTime - currentProcessingTime % 10000 + 10000;
        ctx.timerService().registerProcessingTimeTimer(fireTime);

        if(onLineUserCount ==null) {
            onLineUserCount = 0;
        }
        if ("liveEnter".equals(eventId)) {
            if (bloomFilter == null) {
                bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
                pv = 0;
                uv = 0;
            }
            if (!bloomFilter.mightContain(deviceId)) {
                bloomFilter.put(deviceId);
                uv++;
                bloomFilterState.update(bloomFilter);
                uvState.update(uv);
            }
            pv++;
            pvState.update(pv);

            //累计在线人数
            onLineUserCount++;
            onLineUserState.update(onLineUserCount);
        } else {
            onLineUserCount--;
            onLineUserState.update(onLineUserCount);
        }

        String format = dateFormat.format(bean.getTimestamp());
        String[] fields = format.split("-");
        bean.setDate(fields[0]);
        bean.setHour(fields[1]);
        //输出明细数据(主流)
        out.collect(bean);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<DataBean> out) throws Exception {

        String date = dateFormat.format(timestamp).split("-")[0];

        //(((), (), (), ()));

        ctx.output(aggTag, Tuple4.of(ctx.getCurrentKey() + "_" + date, uvState.value(), pvState.value(), onLineUserState.value()));

    }


    public OutputTag<Tuple4<String, Integer, Integer, Integer>> getAggTag() {
        return aggTag;
    }
}

2、直播人气值计算

  • 在直播间中至少停留1分钟
  • 在30分钟之内,同一设备ID频繁进入该直播间,算一个用户的人气值

实现思路:
按照EventTime划分滑动窗口
使用processFunction注册定时器

package cn._51doit.jobs;

import cn._51doit.pojo.DataBean;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class LiveAudienceCount {
    public static void main(String[] args) throws Exception {

        DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);

        SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFunc());

        KeyedStream<DataBean, Tuple2<String, String>> keyed = beanStream.keyBy(new KeySelector<DataBean, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(DataBean bean) throws Exception {
                String deviceId = bean.getDeviceId();
                String anchor_id = bean.getProperties().get("anchor_id").toString();
                return Tuple2.of(anchor_id, deviceId);
            }
        });

        keyed.process(new KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String, Integer>>() {

            private transient ValueState<Long> inState; //进来对应时间

            private transient ValueState<Long> outState; //出去对应时间

            @Override
            public void open(Configuration parameters) throws Exception {

                ValueStateDescriptor<Long> inStateDescriptor = new ValueStateDescriptor<>("in-state", Long.class);
                inState = getRuntimeContext().getState(inStateDescriptor);
                ValueStateDescriptor<Long> outStateDescriptor = new ValueStateDescriptor<>("out-state", Long.class);
                outState = getRuntimeContext().getState(outStateDescriptor);
            }

            @Override
            public void processElement(DataBean bean, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                Long timestamp = bean.getTimestamp();
                String eventId = bean.getEventId();
                if ("liveEnter".equals(eventId)) {
                    inState.update(timestamp);
					//添加定时器
                    ctx.timerService().registerProcessingTimeTimer(timestamp + 60000 + 1);
                } else if ("liveLeave".equals(eventId)) {
                    Long inTime = inState.value();
                    outState.update(timestamp);
                    if (timestamp - inTime < 60000) {
                        //删除定时器
                        ctx.timerService().deleteProcessingTimeTimer(inTime + 60000 + 1);
                    }

                }

            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                Long outTime = outState.value();

                if (outTime == null) {
                    out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
                } else {
                    Long inTime = inState.value();
                    if (inTime - outTime > 30 * 6000) {
                        out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
                    }
                }
                //14:00 -> 14:29
                //15:00
            }
        });

    }

}

3、观看直播人数统计结果保存

建表:


CREATE TABLE tb_user_event2
(
    `id` String comment '数据唯一id', 
	`anchorId` string  comment '主播ID',
    `deviceId` String comment '用户ID',
    `eventId` String comment '事件ID',
    `os` String comment '系统名称',
    `province` String comment '省份',
    `channel` String comment '下载渠道',
    `deviceType` String comment '设备类型',
    `eventTime` DateTime64 comment '数据中所携带的时间',
    `date` String comment 'eventTime转成YYYYMM格式',
    `hour` String comment 'eventTime转成HH格式列席',
    `processTime` DateTime DEFAULT now()
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;

写入代码在上面的方式二

二、打赏礼物需求分析

在MySQL中还有一种礼物表(维表),需要进行关联,关联维表通常的解决方案:

a) 每来一条数据查一次数据库(慢、吞吐量低)
b) 可以使用异步IO(相对快,消耗资源多)
c) 广播State(最快、适用于少量数据、数据可以变化的)

礼物表:小表、数据可能会发生变化(礼物停用、新增了一个礼物)

统计的具体指标:
1.各个主播的收到礼物的数量、积分值
2.打赏礼物的数量、受欢迎礼物topN(可以将数据写入到数据库中,查询时再排序)
3.做多维度的指标统计(ClickHouse)

1、按照主播(直播间)统计礼物的积分(抖币)

统计的具体指标:各个主播的收到礼物的数量、积分值

礼物表使用jdbcSource, 不停的读数据
行为数据使用kafkaSource

package cn._51doit.jobs;

import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.source.MySQLSource;
import cn._51doit.udf.GiftConnectFunction;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class GiftPointsCount {

    public static void main(String[] args) throws Exception {

        DataStreamSource<Tuple4<Integer, String, Double, Integer>> mysqlStream = FlinkUtils.env.addSource(new MySQLSource());

        MapStateDescriptor<Integer, Tuple2<String, Double>> stateDescriptor = new MapStateDescriptor<>("gift-broadcast-state", TypeInformation.of(Integer.class), TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {
        }));

        BroadcastStream<Tuple4<Integer, String, Double, Integer>> broadcastStream = mysqlStream.broadcast(stateDescriptor);

        DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);

        SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFuncV2());

        SingleOutputStreamOperator<DataBean> filterdStream = beanStream.filter(bean -> "liveReward".equals(bean.getEventId()));

        SingleOutputStreamOperator<Tuple3<String, String, Double>> anchorIdNamePointsStream = filterdStream.connect(broadcastStream).process(new GiftConnectFunction(stateDescriptor));

        //先按照主播ID进行key,将point进行sum
        SingleOutputStreamOperator<Tuple3<String, String, Double>> sum = anchorIdNamePointsStream.keyBy(tp -> tp.f0).sum(2);

        FlinkUtils.env.execute();

    }
}

MySQLSource

package cn._51doit.source;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.*;

public class MySQLSource extends RichSourceFunction<Tuple4<Integer, String, Double, Integer>> {

    private Connection connection = null;
    private PreparedStatement preparedStatement = null;
    private boolean flag = true;


    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/doit_mall?characterEncoding=utf-8", "root", "123456");
    }


    @Override
    public void run(SourceContext<Tuple4<Integer, String, Double, Integer>> ctx) throws Exception {

        long timestamp = 0;

        while (flag) {
            String sql = "SELECT id, name, points, deleted FROM tb_live_gift WHERE updateTime > ?" + (timestamp == 0 ? " AND deleted = 0" : "");
            preparedStatement = connection.prepareStatement(sql);
            preparedStatement.setDate(1, new Date(timestamp));
            ResultSet resultSet = preparedStatement.executeQuery();
            timestamp = System.currentTimeMillis();
            while (resultSet.next()) {

                int id = resultSet.getInt("id");
                String name = resultSet.getString("name");
                double points = resultSet.getDouble("points");
                int deleted = resultSet.getInt("deleted");
                ctx.collect(Tuple4.of(id, name, points, deleted));
            }
            resultSet.close();
            Thread.sleep(10000);
        }


    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        preparedStatement.close();
        connection.close();
    }
}

GiftConnectFunction

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class GiftConnectFunction extends BroadcastProcessFunction<DataBean, Tuple4<Integer, String, Double, Integer>, Tuple3<String, String, Double>> {

    private MapStateDescriptor<Integer, Tuple2<String, Double>> stateDescriptor;

    public GiftConnectFunction(MapStateDescriptor<Integer, Tuple2<String, Double>> stateDescriptor) {
        this.stateDescriptor = stateDescriptor;
    }

    @Override
    public void processElement(DataBean value, ReadOnlyContext ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
        ReadOnlyBroadcastState<Integer, Tuple2<String, Double>> broadcastState = ctx.getBroadcastState(stateDescriptor);

        String anchorId =value.getProperties().get("anchor_id").toString();
        Integer giftId = Integer.parseInt(value.getProperties().get("gift_id").toString());
        Tuple2<String, Double> tp = broadcastState.get(giftId);
        if (tp != null) {
            out.collect(Tuple3.of(anchorId, tp.f0, tp.f1));
        } else {
            out.collect(Tuple3.of(anchorId, giftId.toString(), null));
        }
    }

    @Override
    public void processBroadcastElement(Tuple4<Integer, String, Double, Integer> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {

        BroadcastState<Integer, Tuple2<String, Double>> broadcastState = ctx.getBroadcastState(stateDescriptor);
        Integer id = value.f0;
        String name = value.f1;
        Double points = value.f2;
        Integer deleted = value.f3;
        if(deleted == 0) {
            broadcastState.put(id, Tuple2.of(name, points));
        } else {
            broadcastState.remove(id);
        }
    }

}

2、热门商品TopN需求分析

统计具体指标:统计10分钟内,每隔1分钟统计一次各个分类、各种事件类型的热门商品(商品ID)

[20:00, 20:10],手机,浏览、华为p40, 50000
[20:00, 20:10],手机、浏览、vivo s27, 3000
[20:00, 20:10],手机、浏览、mi11,2000
package cn._51doit.jobs;

import cn._51doit.pojo.DataBean;
import cn._51doit.pojo.ItemEventCount;
import cn._51doit.udf.HotGoodsAggregateFunction;
import cn._51doit.udf.HotGoodsTopNFunction;
import cn._51doit.udf.HotGoodsWindowFunction;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;

public class HotGoodsCount {

    public static void main(String[] args) throws Exception {

        DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);

        SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFunc());

        SingleOutputStreamOperator<DataBean> filteredStream = beanStream.filter(bean -> bean.getEventId().startsWith("product"));

        //按照EventTime划分窗口
        SingleOutputStreamOperator<DataBean> beanStreamWithWaterMark = filteredStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DataBean>forBoundedOutOfOrderness(Duration.ofMillis(5000))
                .withTimestampAssigner((bean, ts) -> bean.getTimestamp()));


        KeyedStream<DataBean, Tuple3<String, String, String>> keyedStream = beanStreamWithWaterMark.keyBy(new KeySelector<DataBean, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> getKey(DataBean value) throws Exception {
                String eventId = value.getEventId();
                String categoryId = value.getProperties().get("category_id").toString();
                String productId = value.getProperties().get("product_id").toString();
                return Tuple3.of(eventId, categoryId, productId);
            }
        });

        WindowedStream<DataBean, Tuple3<String, String, String>, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));

        //不要使用全量聚合,而是使用增量聚合
        //商品ID, 事件ID,分类Id,次数,窗口起始时间,窗口结束时间
        SingleOutputStreamOperator<ItemEventCount> aggRes = window.aggregate(new HotGoodsAggregateFunction(), new HotGoodsWindowFunction());

        //接下来要排序
        //分类ID,事件ID,同一个窗口的数据数据分到同一个组内
        KeyedStream<ItemEventCount, Tuple4<String, String, Long, Long>> keyedAggStream = aggRes.keyBy(new KeySelector<ItemEventCount, Tuple4<String, String, Long, Long>>() {
            @Override
            public Tuple4<String, String, Long, Long> getKey(ItemEventCount value) throws Exception {
                return Tuple4.of(value.categoryId, value.eventId, value.windowStart, value.windowEnd);
            }
        });

        SingleOutputStreamOperator<List<ItemEventCount>> process = keyedAggStream.process(new HotGoodsTopNFunction());

        process.print();

        FlinkUtils.env.execute();
    }
}

HotGoodsTopNFunction

package cn._51doit.udf;

import cn._51doit.pojo.ItemEventCount;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class HotGoodsTopNFunction extends KeyedProcessFunction<Tuple4<String, String, Long, Long>, ItemEventCount, List<ItemEventCount>> {


    private transient ValueState<List<ItemEventCount>> listValueState;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<List<ItemEventCount>> stateDescriptor = new ValueStateDescriptor<>("item-list-state", TypeInformation.of(new TypeHint<List<ItemEventCount>>() {
        }));
        listValueState = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(ItemEventCount value, Context ctx, Collector<List<ItemEventCount>> out) throws Exception {

        List<ItemEventCount> lst = listValueState.value();
        if(lst == null) {
            lst = new ArrayList<>();
        }
        lst.add(value);
        listValueState.update(lst);
        //注册定时器

        //long currentWatermark = ().currentWatermark();
        ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);

    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemEventCount>> out) throws Exception {

        //排序后在输出
        List<ItemEventCount> lst = listValueState.value();

        lst.sort((a, b) -> Long.compare(b.count, a.count));

        List<ItemEventCount> sortedList = lst.subList(0, Math.min(3, lst.size()));

        //排完序输出后,清空状态
        listValueState.clear();
        
        out.collect(sortedList);

    }
}

HotGoodsAggregateFunction

package cn._51doit.udf;

import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.functions.AggregateFunction;

public class HotGoodsAggregateFunction implements AggregateFunction<DataBean, Long, Long> {

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(DataBean value, Long accumulator) {
        return accumulator + 1;
    }

    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
        return null;
    }
}

HotGoodsWindowFunction

package cn._51doit.udf;

import cn._51doit.pojo.ItemEventCount;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class HotGoodsWindowFunction implements WindowFunction<Long, ItemEventCount, Tuple3<String, String, String>, TimeWindow> {


    //窗口触发后每一个组会调用一次(在窗口内增量聚合后的数据)
    @Override
    public void apply(Tuple3<String, String, String> tp, TimeWindow window, Iterable<Long> input, Collector<ItemEventCount> out) throws Exception {
        String eventId = tp.f0;
        String categoryId = tp.f1;
        String productId = tp.f2;
        Long count = input.iterator().next();
        long start = window.getStart();
        long end = window.getEnd();
        out.collect(new ItemEventCount(productId, eventId, categoryId, count, start, end));
    }
}

ItemEventCount

package cn._51doit.pojo;

public class ItemEventCount {

        public String productId;    // 商品ID
        public String eventId;     // 事件类型
        public String categoryId;  //商品分类ID
        public long count;         // 商品的点击量
        public long windowStart;   // 窗口开始时间戳
        public long windowEnd;     // 窗口结束时间戳

    public ItemEventCount(String productId, String eventId, String categoryId, long count, long windowStart, long windowEnd) {
        this.productId = productId;
        this.eventId = eventId;
        this.categoryId = categoryId;
        this.count = count;
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "ItemEventCount{" +
                "productId='" + productId + '\'' +
                ", eventId='" + eventId + '\'' +
                ", categoryId='" + categoryId + '\'' +
                ", count=" + count +
                ", windowStart=" + windowStart +
                ", windowEnd=" + windowEnd +
                '}';
    }
}