如果数据量大,维度多,用keyBy并不方便,建议写到外部实时数仓里,Clickhouse擅长实时查询,flink擅长实时处理。
一、多维度复杂统计(使用Clickhouse)
使用是clickhouse的ReplacingMergeTree,可以将同一个分区中,ID相同的数据进行merge,可以保留最新的数据,可以使用这个特点实现Flink + Clickhouse(勉强)实现数据一致性。
存在的问题:写入到clickhouse中的数据不能立即merge,需要手动optimize或后台自动合并。
解决方案:查询时在表名的后面加上final关键字,就只查最新的数据数据,但是效率变低了。
如何设计clickhouse的表?
1.可以支持维度查询(大宽表)
2.按照时间段进行查询(将时间作为表的字段并且建分区表)
3.可以统计出PV、UV(去重查询)
4.支持分区(按照时间进行分区)
5.支持覆盖(ReplacingMergeTree)(对查询结果准确性要求高的,表名后面加final)
6.如果生成一个唯一的ID (在Kafka中生成唯一的ID,topic+分区+偏移量)
7.相同的数据要进入到相同的分区(按照数据的时间即EventTime进行分区)
1、建表
CREATE TABLE tb_user_event
(
`id` String comment '数据唯一id,使用Kafka的topic+分区+偏移量',
`deviceId` String comment '设备ID',
`eventId` String comment '事件ID',
`isNew` UInt8 comment '是否是新用户1为新,0为老',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMMDD格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime comment '插入到数据库时的系统时间'
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
2、自定义Kafka反序列器生成唯一ID
MyKafkaStringDeserializationSchema
package cn._51doit.kafka;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;
/**
* 可以读取kafka数据中的topic、partition、offset
*/
public class MyKafkaStringDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
@Override
public boolean isEndOfStream(Tuple2<String, String> nextElement) {
return false;
}
@Override
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String id = topic + "-" + partition + "-" + offset;
String value = new String(record.value(), StandardCharsets.UTF_8);
return Tuple2.of(id, value);
}
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {});
}
}
FlinkUtils新增createKafkaStreamV2
package cn._51doit.utils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class FlinkUtils {
public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static ParameterTool parameterTool;
public static <T> DataStream<T> createKafkaStream(String[] args, Class<? extends DeserializationSchema<T>> deserializer) throws Exception {
parameterTool = ParameterTool.fromPropertiesFile(args[0]);
long checkpointInterval = parameterTool.getLong("", 30000L);
String checkpointPath = parameterTool.getRequired("");
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
List<String> topics = Arrays.asList(parameterTool.getRequired("").split(","));
Properties properties = parameterTool.getProperties();
//从Kafka中读取数据
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(kafkaConsumer);
}
/**
* 可以读取Kafka消费数据中的topic、partition、offset
* @param args
* @param deserializer
* @param <T>
* @return
* @throws Exception
*/
public static <T> DataStream<T> createKafkaStreamV2(String[] args, Class<? extends KafkaDeserializationSchema<T>> deserializer) throws Exception {
parameterTool = ParameterTool.fromPropertiesFile(args[0]);
long checkpointInterval = parameterTool.getLong("", 30000L);
String checkpointPath = parameterTool.getRequired("");
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
//(new FsStateBackend(checkpointPath));
env.setStateBackend(new RocksDBStateBackend(checkpointPath, true));
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
List<String> topics = Arrays.asList(parameterTool.getRequired("").split(","));
Properties properties = parameterTool.getProperties();
//从Kafka中读取数据
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializer.newInstance(), properties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(false);
return env.addSource(kafkaConsumer);
}
}
调用TestKafkaId
package cn._51doit.test;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
public class TestKafkaId {
public static void main(String[] args) throws Exception {
DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
kafkaStream.print();
FlinkUtils.env.execute();
}
}
3、使用JdbcSink将数据写入clickhouse
导入clickhouse驱动
<!-- flink sink 的 jdbc标准API -->
<dependency>
<groupId></groupId>
<artifactId>flink-connector-jdbc_${}</artifactId>
<version>${}</version>
</dependency>
<!--clickhouse的连接jdbc驱动 -->
<dependency>
<groupId></groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
package cn._51doit.jobs;
import cn._51doit.constant.EventID;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.IsNewUserFunctionV2;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 优化:按照deviceID进行KeyBy,使用OperatorState,虽然已过分区中有多个组,但是在一个分区中使用一个布隆过滤器
*/
public class DataToClickhouse {
public static void main(String[] args) throws Exception {
DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
//解析数据
SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFuncV2());
beanStream.map(new MapFunction<DataBean, DataBean>() {
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");
@Override
public DataBean map(DataBean bean) throws Exception {
Long timestamp = bean.getTimestamp();
String format = dateFormat.format(new Date(timestamp));
String[] fields = format.split("-");
bean.setDate(fields[0]);
bean.setHour(fields[1]);
return bean;
}
}).addSink(JdbcSink.sink(
"insert into tb_user_event2 values (?,?,?,?,?,?,?,?,?,?,?,?)",
(ps, bean) -> {
ps.setString(1, bean.getId());
ps.setString(2, bean.getDeviceId());
ps.setString(3, bean.getEventId());
ps.setInt(4, bean.getIsN());
ps.setString(5, bean.getProvince());
ps.setString(6, bean.getOsName());
ps.setString(7, bean.getReleaseChannel());
ps.setString(8, bean.getDeviceType());
ps.setLong(9, bean.getTimestamp());
ps.setString(10, bean.getDate());
ps.setString(11, bean.getHour());
ps.setString(12, "2021-03-18 00:00:00");
},
JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt(""))
.withBatchIntervalMs(FlinkUtils.parameterTool.getInt(""))
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FlinkUtils.parameterTool.getRequired(""))
.withDriverName("")
.build()));
FlinkUtils.env.execute();
}
}
JsonToBeanFuncV2
package cn._51doit.udf;
import cn._51doit.pojo.DataBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
* 将JSON字符串转成JavaBean
*/
public class JsonToBeanFuncV2 extends ProcessFunction<Tuple2<String, String>, DataBean> {
@Override
public void processElement(Tuple2<String, String> tp, Context ctx, Collector<DataBean> out) throws Exception {
try {
String id = tp.f0; //数据唯一ID,topic-partition-offset
DataBean dataBean = JSON.parseObject(tp.f1, DataBean.class);
dataBean.setId(id);
out.collect(dataBean);
} catch (Exception e) {
//();
//TODO 将有问题的数据保存起来
}
}
}
二、观看直播人数统计需求
a)实时统计累计观众(实时展示,直接用flink keyBy统计)
b)实时统计各个直播间在线人数(实时展示,直接用flink keyBy统计)
c)查看多个维度的明细(将数据写入到clickhouse中)
1、观看直播人数统计实现
实现方式一:
a)将数据来一条就写入到Redis/MySQL或大屏展示(延迟低、但是效率低、对数据库压力大)
b)再写一个job将各种明细数据写入到ClickHouse中(提交了2个job、数据重复计算)
package cn._51doit.jobs;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class LiveAudienceCountV2 {
public static void main(String[] args) throws Exception {
DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);
SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFunc());
SingleOutputStreamOperator<DataBean> liveDataStream = beanStream.filter(new FilterFunction<DataBean>() {
@Override
public boolean filter(DataBean bean) throws Exception {
return bean.getEventId().startsWith("live");
}
});
//按照进入直播间的事件ID进行过滤
SingleOutputStreamOperator<DataBean> enterStream = liveDataStream.filter(new FilterFunction<DataBean>() {
@Override
public boolean filter(DataBean value) throws Exception {
return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
}
});
//统计各个主播的累计观看人数
//按照主播ID(按照直播间)
//统计各个主播直播间实时在线人数
KeyedStream<DataBean, String> keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());
SingleOutputStreamOperator<Tuple4<String, Integer, Integer, Integer>> res = keyedEnterStream.process(new AnchorDistinctTotalAudienceFunc());
res.print();
FlinkUtils.env.execute();
}
}
AnchorDistinctTotalAudienceFunc
package cn._51doit.udf;
import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 实时统计各个直播间主播的观看人数和次数
*/
public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction<String, DataBean, Tuple4<String, Integer, Integer, Integer>> {
private transient ValueState<Integer> uvState;
private transient ValueState<Integer> pvState;
private transient ValueState<BloomFilter<String>> bloomFilterState;
private transient ValueState<Integer> onLineUserState;
@Override
public void open(Configuration parameters) throws Exception {
//设置状态的TTL
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(Time.hours(6))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.neverReturnExpired()
.build(AnchorDistinctTotalAudienceFunc);
ValueStateDescriptor<Integer> uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
uvStateDescriptor.enableTimeToLive(stateTtlConfig);
ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class);
pvStateDescriptor.enableTimeToLive(stateTtlConfig);
ValueStateDescriptor<BloomFilter<String>> bloomFilterStateDescriptor =
new ValueStateDescriptor<BloomFilter<String>>("bloom-filter-state", TypeInformation.of(new TypeHint<BloomFilter<String>>() {
}));
bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig);
ValueStateDescriptor<Integer> onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig);
uvState = getRuntimeContext().getState(uvStateDescriptor);
pvState = getRuntimeContext().getState(pvStateDescriptor);
bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor);
onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor);
}
@Override
public void processElement(DataBean bean, Context ctx, Collector<Tuple4<String, Integer, Integer, Integer>> out) throws Exception {
Integer onLineUserCount = onLineUserState.value();
String deviceId = bean.getDeviceId();
Integer uv = uvState.value();
Integer pv = pvState.value();
BloomFilter<String> bloomFilter = bloomFilterState.value();
String eventId = bean.getEventId();
if(onLineUserCount ==null) {
onLineUserCount = 0;
}
if ("liveEnter".equals(eventId)) {
if (bloomFilter == null) {
bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
pv = 0;
uv = 0;
}
if (!bloomFilter.mightContain(deviceId)) {
bloomFilter.put(deviceId);
uv++;
bloomFilterState.update(bloomFilter);
uvState.update(uv);
}
pv++;
pvState.update(pv);
//累计在线人数
onLineUserCount++;
onLineUserState.update(onLineUserCount);
} else {
onLineUserCount--;
onLineUserState.update(onLineUserCount);
}
out.collect(Tuple4.of(ctx.getCurrentKey(), uv, pv, onLineUserCount));
}
}
实现方式二:
将数据攒起来批量(不是简单的增量聚合,不能使用窗口,而是使用定时器)写入到Redis/MySQL(延迟高、效率高、对数据库的压力小)
在同一个job中,将数据写入到Clickhouse中(同一个主题(类型)的数据尽量在一个job中完成,将不同的数据打上不同的标签,侧流输出,可以节省集群资源。避免数据重复读取和计算)
package cn._51doit.jobs;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.AnchorDistinctTotalAudienceFunc;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Optional;
public class LiveAudienceCountV2 {
public static void main(String[] args) throws Exception {
DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFuncV2());
SingleOutputStreamOperator<DataBean> liveDataStream = beanStream.filter(new FilterFunction<DataBean>() {
@Override
public boolean filter(DataBean bean) throws Exception {
return bean.getEventId().startsWith("live");
}
});
//按照进入直播间的事件ID进行过滤
SingleOutputStreamOperator<DataBean> enterStream = liveDataStream.filter(new FilterFunction<DataBean>() {
@Override
public boolean filter(DataBean value) throws Exception {
return "liveEnter".equals(value.getEventId()) || "liveLeave".equals(value.getEventId());
}
});
//统计各个主播的累计观看人数
//按照主播ID(按照直播间)
//统计各个主播直播间实时在线人数
KeyedStream<DataBean, String> keyedEnterStream = enterStream.keyBy(bean -> bean.getProperties().get("anchor_id").toString());
AnchorDistinctTotalAudienceFunc anchorFunc = new AnchorDistinctTotalAudienceFunc();
//主流写入clickhouse
SingleOutputStreamOperator<DataBean> mainStream = keyedEnterStream.process(anchorFunc);
//非主流(聚合的数据),写入redis
DataStream<Tuple4<String, Integer, Integer, Integer>> aggDataStream = mainStream.getSideOutput(anchorFunc.getAggTag());
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPassword("123456").setDatabase(5).build();
aggDataStream.addSink(new RedisSink<>(conf, new AudienceCountMapper()));
mainStream.addSink(JdbcSink.sink(
"insert into tb_anchor_audience_count (id, anchorId, deviceId, eventId, province, os, channel, deviceType, eventTime, date, hour) values (?,?,?,?,?,?,?,?,?,?,?)",
(ps, bean) -> {
ps.setString(1, bean.getId());
ps.setString(2, bean.getProperties().get("anchor_id").toString());
ps.setString(3, bean.getDeviceId());
ps.setString(4, bean.getEventId());
ps.setString(5, bean.getProvince());
ps.setString(6, bean.getOsName());
ps.setString(7, bean.getReleaseChannel());
ps.setString(8, bean.getDeviceType());
ps.setLong(9, bean.getTimestamp());
ps.setString(10, bean.getDate());
ps.setString(11, bean.getHour());
},
JdbcExecutionOptions.builder().withBatchSize(FlinkUtils.parameterTool.getInt(""))
.withBatchIntervalMs(FlinkUtils.parameterTool.getInt(""))
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FlinkUtils.parameterTool.getRequired(""))
.withDriverName("")
.build()));
FlinkUtils.env.execute();
}
public static class AudienceCountMapper implements RedisMapper<Tuple4<String, Integer, Integer, Integer>> {
//WORD_COUNT -> {(spark,5), (flink,6)}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "audience-count");
}
@Override
public String getKeyFromData(Tuple4<String, Integer, Integer, Integer> tp) {
return tp.f0; //aid_20210321
}
@Override
public String getValueFromData(Tuple4<String, Integer, Integer, Integer> tp) {
return tp.f1 + "," + tp.f2 + "," + tp.f3;
}
}
}
AnchorDistinctTotalAudienceFunc
package cn._51doit.udf;
import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
/**
* 实时统计各个直播间主播的观看人数和次数
*/
public class AnchorDistinctTotalAudienceFunc extends KeyedProcessFunction<String, DataBean, DataBean> {
private transient ValueState<Integer> uvState;
private transient ValueState<Integer> pvState;
private transient ValueState<BloomFilter<String>> bloomFilterState;
private transient ValueState<Integer> onLineUserState;
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd-HH");
private OutputTag<Tuple4<String, Integer, Integer, Integer>> aggTag = new OutputTag<Tuple4<String, Integer, Integer, Integer>>("agg-tag"){};
@Override
public void open(Configuration parameters) throws Exception {
//设置状态的TTL
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(Time.hours(6))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.neverReturnExpired()
.build();
ValueStateDescriptor<Integer> uvStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
uvStateDescriptor.enableTimeToLive(stateTtlConfig);
ValueStateDescriptor<Integer> pvStateDescriptor = new ValueStateDescriptor<>("pv-state", Integer.class);
pvStateDescriptor.enableTimeToLive(stateTtlConfig);
ValueStateDescriptor<BloomFilter<String>> bloomFilterStateDescriptor =
new ValueStateDescriptor<BloomFilter<String>>("bloom-filter-state", TypeInformation.of(new TypeHint<BloomFilter<String>>() {
}));
bloomFilterStateDescriptor.enableTimeToLive(stateTtlConfig);
ValueStateDescriptor<Integer> onLineUserStateDescriptor = new ValueStateDescriptor<>("uv-state", Integer.class);
onLineUserStateDescriptor.enableTimeToLive(stateTtlConfig);
uvState = getRuntimeContext().getState(uvStateDescriptor);
pvState = getRuntimeContext().getState(pvStateDescriptor);
bloomFilterState = getRuntimeContext().getState(bloomFilterStateDescriptor);
onLineUserState = getRuntimeContext().getState(onLineUserStateDescriptor);
}
@Override
public void processElement(DataBean bean, Context ctx, Collector<DataBean> out) throws Exception {
Integer onLineUserCount = onLineUserState.value();
String deviceId = bean.getDeviceId();
Integer uv = uvState.value();
Integer pv = pvState.value();
BloomFilter<String> bloomFilter = bloomFilterState.value();
String eventId = bean.getEventId();
//注册定时器
long currentProcessingTime = ctx.timerService().currentProcessingTime();
long fireTime = currentProcessingTime - currentProcessingTime % 10000 + 10000;
ctx.timerService().registerProcessingTimeTimer(fireTime);
if(onLineUserCount ==null) {
onLineUserCount = 0;
}
if ("liveEnter".equals(eventId)) {
if (bloomFilter == null) {
bloomFilter = BloomFilter.create(Funnels.unencodedCharsFunnel(), 1000000);
pv = 0;
uv = 0;
}
if (!bloomFilter.mightContain(deviceId)) {
bloomFilter.put(deviceId);
uv++;
bloomFilterState.update(bloomFilter);
uvState.update(uv);
}
pv++;
pvState.update(pv);
//累计在线人数
onLineUserCount++;
onLineUserState.update(onLineUserCount);
} else {
onLineUserCount--;
onLineUserState.update(onLineUserCount);
}
String format = dateFormat.format(bean.getTimestamp());
String[] fields = format.split("-");
bean.setDate(fields[0]);
bean.setHour(fields[1]);
//输出明细数据(主流)
out.collect(bean);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<DataBean> out) throws Exception {
String date = dateFormat.format(timestamp).split("-")[0];
//(((), (), (), ()));
ctx.output(aggTag, Tuple4.of(ctx.getCurrentKey() + "_" + date, uvState.value(), pvState.value(), onLineUserState.value()));
}
public OutputTag<Tuple4<String, Integer, Integer, Integer>> getAggTag() {
return aggTag;
}
}
2、直播人气值计算
- 在直播间中至少停留1分钟
- 在30分钟之内,同一设备ID频繁进入该直播间,算一个用户的人气值
实现思路:
按照EventTime划分滑动窗口
使用processFunction注册定时器
package cn._51doit.jobs;
import cn._51doit.pojo.DataBean;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class LiveAudienceCount {
public static void main(String[] args) throws Exception {
DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);
SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFunc());
KeyedStream<DataBean, Tuple2<String, String>> keyed = beanStream.keyBy(new KeySelector<DataBean, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(DataBean bean) throws Exception {
String deviceId = bean.getDeviceId();
String anchor_id = bean.getProperties().get("anchor_id").toString();
return Tuple2.of(anchor_id, deviceId);
}
});
keyed.process(new KeyedProcessFunction<Tuple2<String, String>, DataBean, Tuple2<String, Integer>>() {
private transient ValueState<Long> inState; //进来对应时间
private transient ValueState<Long> outState; //出去对应时间
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Long> inStateDescriptor = new ValueStateDescriptor<>("in-state", Long.class);
inState = getRuntimeContext().getState(inStateDescriptor);
ValueStateDescriptor<Long> outStateDescriptor = new ValueStateDescriptor<>("out-state", Long.class);
outState = getRuntimeContext().getState(outStateDescriptor);
}
@Override
public void processElement(DataBean bean, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Long timestamp = bean.getTimestamp();
String eventId = bean.getEventId();
if ("liveEnter".equals(eventId)) {
inState.update(timestamp);
//添加定时器
ctx.timerService().registerProcessingTimeTimer(timestamp + 60000 + 1);
} else if ("liveLeave".equals(eventId)) {
Long inTime = inState.value();
outState.update(timestamp);
if (timestamp - inTime < 60000) {
//删除定时器
ctx.timerService().deleteProcessingTimeTimer(inTime + 60000 + 1);
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Long outTime = outState.value();
if (outTime == null) {
out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
} else {
Long inTime = inState.value();
if (inTime - outTime > 30 * 6000) {
out.collect(Tuple2.of(ctx.getCurrentKey().f0, 1));
}
}
//14:00 -> 14:29
//15:00
}
});
}
}
3、观看直播人数统计结果保存
建表:
CREATE TABLE tb_user_event2
(
`id` String comment '数据唯一id',
`anchorId` string comment '主播ID',
`deviceId` String comment '用户ID',
`eventId` String comment '事件ID',
`os` String comment '系统名称',
`province` String comment '省份',
`channel` String comment '下载渠道',
`deviceType` String comment '设备类型',
`eventTime` DateTime64 comment '数据中所携带的时间',
`date` String comment 'eventTime转成YYYYMM格式',
`hour` String comment 'eventTime转成HH格式列席',
`processTime` DateTime DEFAULT now()
)
ENGINE = ReplacingMergeTree(processTime)
PARTITION BY (date, hour)
ORDER BY id;
写入代码在上面的方式二
二、打赏礼物需求分析
在MySQL中还有一种礼物表(维表),需要进行关联,关联维表通常的解决方案:
a) 每来一条数据查一次数据库(慢、吞吐量低)
b) 可以使用异步IO(相对快,消耗资源多)
c) 广播State(最快、适用于少量数据、数据可以变化的)
礼物表:小表、数据可能会发生变化(礼物停用、新增了一个礼物)
统计的具体指标:
1.各个主播的收到礼物的数量、积分值
2.打赏礼物的数量、受欢迎礼物topN(可以将数据写入到数据库中,查询时再排序)
3.做多维度的指标统计(ClickHouse)
1、按照主播(直播间)统计礼物的积分(抖币)
统计的具体指标:各个主播的收到礼物的数量、积分值
礼物表使用jdbcSource, 不停的读数据
行为数据使用kafkaSource
package cn._51doit.jobs;
import cn._51doit.kafka.MyKafkaStringDeserializationSchema;
import cn._51doit.pojo.DataBean;
import cn._51doit.source.MySQLSource;
import cn._51doit.udf.GiftConnectFunction;
import cn._51doit.udf.JsonToBeanFuncV2;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class GiftPointsCount {
public static void main(String[] args) throws Exception {
DataStreamSource<Tuple4<Integer, String, Double, Integer>> mysqlStream = FlinkUtils.env.addSource(new MySQLSource());
MapStateDescriptor<Integer, Tuple2<String, Double>> stateDescriptor = new MapStateDescriptor<>("gift-broadcast-state", TypeInformation.of(Integer.class), TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {
}));
BroadcastStream<Tuple4<Integer, String, Double, Integer>> broadcastStream = mysqlStream.broadcast(stateDescriptor);
DataStream<Tuple2<String, String>> kafkaStream = FlinkUtils.createKafkaStreamV2(args, MyKafkaStringDeserializationSchema.class);
SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFuncV2());
SingleOutputStreamOperator<DataBean> filterdStream = beanStream.filter(bean -> "liveReward".equals(bean.getEventId()));
SingleOutputStreamOperator<Tuple3<String, String, Double>> anchorIdNamePointsStream = filterdStream.connect(broadcastStream).process(new GiftConnectFunction(stateDescriptor));
//先按照主播ID进行key,将point进行sum
SingleOutputStreamOperator<Tuple3<String, String, Double>> sum = anchorIdNamePointsStream.keyBy(tp -> tp.f0).sum(2);
FlinkUtils.env.execute();
}
}
MySQLSource
package cn._51doit.source;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.*;
public class MySQLSource extends RichSourceFunction<Tuple4<Integer, String, Double, Integer>> {
private Connection connection = null;
private PreparedStatement preparedStatement = null;
private boolean flag = true;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/doit_mall?characterEncoding=utf-8", "root", "123456");
}
@Override
public void run(SourceContext<Tuple4<Integer, String, Double, Integer>> ctx) throws Exception {
long timestamp = 0;
while (flag) {
String sql = "SELECT id, name, points, deleted FROM tb_live_gift WHERE updateTime > ?" + (timestamp == 0 ? " AND deleted = 0" : "");
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setDate(1, new Date(timestamp));
ResultSet resultSet = preparedStatement.executeQuery();
timestamp = System.currentTimeMillis();
while (resultSet.next()) {
int id = resultSet.getInt("id");
String name = resultSet.getString("name");
double points = resultSet.getDouble("points");
int deleted = resultSet.getInt("deleted");
ctx.collect(Tuple4.of(id, name, points, deleted));
}
resultSet.close();
Thread.sleep(10000);
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
preparedStatement.close();
connection.close();
}
}
GiftConnectFunction
package cn._51doit.udf;
import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class GiftConnectFunction extends BroadcastProcessFunction<DataBean, Tuple4<Integer, String, Double, Integer>, Tuple3<String, String, Double>> {
private MapStateDescriptor<Integer, Tuple2<String, Double>> stateDescriptor;
public GiftConnectFunction(MapStateDescriptor<Integer, Tuple2<String, Double>> stateDescriptor) {
this.stateDescriptor = stateDescriptor;
}
@Override
public void processElement(DataBean value, ReadOnlyContext ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
ReadOnlyBroadcastState<Integer, Tuple2<String, Double>> broadcastState = ctx.getBroadcastState(stateDescriptor);
String anchorId =value.getProperties().get("anchor_id").toString();
Integer giftId = Integer.parseInt(value.getProperties().get("gift_id").toString());
Tuple2<String, Double> tp = broadcastState.get(giftId);
if (tp != null) {
out.collect(Tuple3.of(anchorId, tp.f0, tp.f1));
} else {
out.collect(Tuple3.of(anchorId, giftId.toString(), null));
}
}
@Override
public void processBroadcastElement(Tuple4<Integer, String, Double, Integer> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
BroadcastState<Integer, Tuple2<String, Double>> broadcastState = ctx.getBroadcastState(stateDescriptor);
Integer id = value.f0;
String name = value.f1;
Double points = value.f2;
Integer deleted = value.f3;
if(deleted == 0) {
broadcastState.put(id, Tuple2.of(name, points));
} else {
broadcastState.remove(id);
}
}
}
2、热门商品TopN需求分析
统计具体指标:统计10分钟内,每隔1分钟统计一次各个分类、各种事件类型的热门商品(商品ID)
[20:00, 20:10],手机,浏览、华为p40, 50000
[20:00, 20:10],手机、浏览、vivo s27, 3000
[20:00, 20:10],手机、浏览、mi11,2000
package cn._51doit.jobs;
import cn._51doit.pojo.DataBean;
import cn._51doit.pojo.ItemEventCount;
import cn._51doit.udf.HotGoodsAggregateFunction;
import cn._51doit.udf.HotGoodsTopNFunction;
import cn._51doit.udf.HotGoodsWindowFunction;
import cn._51doit.udf.JsonToBeanFunc;
import cn._51doit.utils.FlinkUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.List;
public class HotGoodsCount {
public static void main(String[] args) throws Exception {
DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args, SimpleStringSchema.class);
SingleOutputStreamOperator<DataBean> beanStream = kafkaStream.process(new JsonToBeanFunc());
SingleOutputStreamOperator<DataBean> filteredStream = beanStream.filter(bean -> bean.getEventId().startsWith("product"));
//按照EventTime划分窗口
SingleOutputStreamOperator<DataBean> beanStreamWithWaterMark = filteredStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<DataBean>forBoundedOutOfOrderness(Duration.ofMillis(5000))
.withTimestampAssigner((bean, ts) -> bean.getTimestamp()));
KeyedStream<DataBean, Tuple3<String, String, String>> keyedStream = beanStreamWithWaterMark.keyBy(new KeySelector<DataBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> getKey(DataBean value) throws Exception {
String eventId = value.getEventId();
String categoryId = value.getProperties().get("category_id").toString();
String productId = value.getProperties().get("product_id").toString();
return Tuple3.of(eventId, categoryId, productId);
}
});
WindowedStream<DataBean, Tuple3<String, String, String>, TimeWindow> window = keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));
//不要使用全量聚合,而是使用增量聚合
//商品ID, 事件ID,分类Id,次数,窗口起始时间,窗口结束时间
SingleOutputStreamOperator<ItemEventCount> aggRes = window.aggregate(new HotGoodsAggregateFunction(), new HotGoodsWindowFunction());
//接下来要排序
//分类ID,事件ID,同一个窗口的数据数据分到同一个组内
KeyedStream<ItemEventCount, Tuple4<String, String, Long, Long>> keyedAggStream = aggRes.keyBy(new KeySelector<ItemEventCount, Tuple4<String, String, Long, Long>>() {
@Override
public Tuple4<String, String, Long, Long> getKey(ItemEventCount value) throws Exception {
return Tuple4.of(value.categoryId, value.eventId, value.windowStart, value.windowEnd);
}
});
SingleOutputStreamOperator<List<ItemEventCount>> process = keyedAggStream.process(new HotGoodsTopNFunction());
process.print();
FlinkUtils.env.execute();
}
}
HotGoodsTopNFunction
package cn._51doit.udf;
import cn._51doit.pojo.ItemEventCount;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class HotGoodsTopNFunction extends KeyedProcessFunction<Tuple4<String, String, Long, Long>, ItemEventCount, List<ItemEventCount>> {
private transient ValueState<List<ItemEventCount>> listValueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<List<ItemEventCount>> stateDescriptor = new ValueStateDescriptor<>("item-list-state", TypeInformation.of(new TypeHint<List<ItemEventCount>>() {
}));
listValueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(ItemEventCount value, Context ctx, Collector<List<ItemEventCount>> out) throws Exception {
List<ItemEventCount> lst = listValueState.value();
if(lst == null) {
lst = new ArrayList<>();
}
lst.add(value);
listValueState.update(lst);
//注册定时器
//long currentWatermark = ().currentWatermark();
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemEventCount>> out) throws Exception {
//排序后在输出
List<ItemEventCount> lst = listValueState.value();
lst.sort((a, b) -> Long.compare(b.count, a.count));
List<ItemEventCount> sortedList = lst.subList(0, Math.min(3, lst.size()));
//排完序输出后,清空状态
listValueState.clear();
out.collect(sortedList);
}
}
HotGoodsAggregateFunction
package cn._51doit.udf;
import cn._51doit.pojo.DataBean;
import org.apache.flink.api.common.functions.AggregateFunction;
public class HotGoodsAggregateFunction implements AggregateFunction<DataBean, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(DataBean value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
HotGoodsWindowFunction
package cn._51doit.udf;
import cn._51doit.pojo.ItemEventCount;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class HotGoodsWindowFunction implements WindowFunction<Long, ItemEventCount, Tuple3<String, String, String>, TimeWindow> {
//窗口触发后每一个组会调用一次(在窗口内增量聚合后的数据)
@Override
public void apply(Tuple3<String, String, String> tp, TimeWindow window, Iterable<Long> input, Collector<ItemEventCount> out) throws Exception {
String eventId = tp.f0;
String categoryId = tp.f1;
String productId = tp.f2;
Long count = input.iterator().next();
long start = window.getStart();
long end = window.getEnd();
out.collect(new ItemEventCount(productId, eventId, categoryId, count, start, end));
}
}
ItemEventCount
package cn._51doit.pojo;
public class ItemEventCount {
public String productId; // 商品ID
public String eventId; // 事件类型
public String categoryId; //商品分类ID
public long count; // 商品的点击量
public long windowStart; // 窗口开始时间戳
public long windowEnd; // 窗口结束时间戳
public ItemEventCount(String productId, String eventId, String categoryId, long count, long windowStart, long windowEnd) {
this.productId = productId;
this.eventId = eventId;
this.categoryId = categoryId;
this.count = count;
this.windowStart = windowStart;
this.windowEnd = windowEnd;
}
@Override
public String toString() {
return "ItemEventCount{" +
"productId='" + productId + '\'' +
", eventId='" + eventId + '\'' +
", categoryId='" + categoryId + '\'' +
", count=" + count +
", windowStart=" + windowStart +
", windowEnd=" + windowEnd +
'}';
}
}