7.5 应用案例-TopN
7.5.1 使用ProcessAllWindowFunction
- 场景
例如,需要统计最近10秒内最热门的两个url链接,并且每5秒
- 思路
- 使用全窗口函数ProcessAllWindowFunction开窗处理,使用HashMap来保存每个url的访问次数(通过遍历)
- 然后转成ArrayList,然后进行排序,取前两名输出即可
- 代码
- 数据源代码
public class ClickSource implements SourceFunction<Event> {
//声明一个标志位控制数据生成
private Boolean running = true;
@Override
//泛型为Event
public void run(SourceContext<Event> ctx) throws Exception {
//随机生成数据
Random random = new Random();
//定义字段选取的数据集
String[] users = {"Mary","Alice","Bob","Cary"};
String[] urls = {"./home","./cart","./fav","./prod?id=100","/prod?id=10"};
//一直循环生成数据
while (running){
String user = users[random.nextInt(users.length-1)];
String url = urls[random.nextInt(urls.length-1)];
//系统当前事件的毫秒数
Long timestamp = Calendar.getInstance().getTimeInMillis();
//collect收集Event发往下游
ctx.collect(new Event(user,url,timestamp));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running =false;
}
}
- 核心代码
public class TopNExample_ProcessAllWindowFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序种延迟0,相当于-1毫秒而已
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//直接开窗,收集数据排序
stream.map(data->data.url)//得到String类型的Stream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))//直接开窗
.aggregate(new UrlHashMapCountAgg(),new UrlAllWindowResult())
.print();
env.execute();
}
//实现自定义的增量聚合函数
public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String,Long>, ArrayList<Tuple2<String,Long>>> {
@Override
public HashMap<String, Long> createAccumulator() {
return new HashMap<>();
}
@Override
public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
if(accumulator.containsKey(value)){
Long count = accumulator.get(value);
accumulator.put(value,count+1);
}else {
accumulator.put(value,1L);
}
return accumulator;
}
//就HashMap转成ArrayList<Tuple2<String, Long>>的操作
@Override
public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
ArrayList<Tuple2<String, Long>> result = new ArrayList<>();
for(String key:accumulator.keySet()){
result.add(Tuple2.of(key,accumulator.get(key)));
}
//排序
result.sort(new Comparator<Tuple2<String, Long>>() {
@Override
//降序,后减前
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return o2.f1.intValue()-o1.f1.intValue();
}
});
return result;
}
@Override
public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
return null;
}
}
//实现自定义全窗口函数,包装信息输出结果
public static class UrlAllWindowResult extends ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow> {
@Override
public void process(Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
//先拿出来
ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
StringBuilder result = new StringBuilder();
result.append("---------------\n");
//获取窗口信息
result.append("窗口结束时间:"+new Timestamp(context.window().getEnd())+"\n");
//取List排过序后的前两个,包装信息输出
for(int i = 0;i<2;i++){
Tuple2<String, Long> currTuple = list.get(i);
String info = "No."+(i+1)+" "
+"url:"+currTuple.f0+" "
+"访问量"+currTuple.f1+"\n ";
result.append(info);
}
result.append("--------------------\n");
out.collect(result.toString());
}
}
}
- 结果
窗口结束时间:2022-11-25 21:58:35.0
No.1 url:./fav 访问量1
No.2 url:./home 访问量1
--------------------
---------------
窗口结束时间:2022-11-25 21:58:40.0
No.1 url:./home 访问量3
No.2 url:./prod?id=100 访问量3
--------------------
---------------
窗口结束时间:2022-11-25 21:58:45.0
No.1 url:./prod?id=100 访问量4
No.2 url:./cart 访问量2
--------------------
---------------
窗口结束时间:2022-11-25 21:58:50.0
No.1 url:./prod?id=100 访问量4
No.2 url:./fav 访问量3
--------------------
- 评价
用这个方法思路易懂,但是使用了windowAll的全窗口函数,stream直接开窗,所有数据收集到窗口中,导致无分区也就是并行度会变成1,大数据场景下内存估计会炸产生OOM
7.5.2 使用 KeyedProcessFunction
- 场景
例如,需要统计最近10秒内最热门的两个url链接,并且每5秒
- 思路
-
触发
-
参照窗口的流式处理原理,将数据汇聚一段时间后输出,就可以使用定时器
-
窗口结束时间+1豪秒使得watermark触发,即数据到齐
-
-
收集
- 定义一个列表把所有数据保存下来
- 使用状态,根据之前keyby按键分组的状态
-
输出
- 排序
- 输出
- 代码
跟上面差不多,多了状态设置,可以理解urlViewCountListState这个就是用来存有状态的数据的
- 代码
public class TopNExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
//乱序种延迟0,相当于-1毫秒而已
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//1.按照url分组,统计窗口内每个url的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream
.keyBy(data -> data.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());
urlCountStream.print("url count");
//2.对一同一窗口统计出的访问量,进行手机和排序(以聚合过的结果按照窗口间隔不间断流式输出)
urlCountStream.keyBy(data->data.windowEnd)
.process(new TopNProcessResult(2))
.print();
env.execute();
}
//实现自定义的KeyProcessFunction
public static class TopNProcessResult extends KeyedProcessFunction<Long,UrlViewCount,String> {
//定义一个属性n
private Integer n;
//1.定义列表状态
private ListState<UrlViewCount> urlViewCountListState;
public TopNProcessResult(Integer n) {
this.n = n;
}
//2.管理状态,在环境中获取状态,使用生命周期方法获取
@Override
public void open(Configuration parameters) throws Exception {
urlViewCountListState= getRuntimeContext().getListState(//传入描述器
//两个参数:一个名字,一个类型
new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)));
}
@Override
public void processElement(UrlViewCount value,Context ctx, Collector<String> out) throws Exception {
//3.将数据保存到状态中
urlViewCountListState.add(value);
//4.注册windowEnd+1ms的定时器
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey()+1);
}
//5.用来触发定时器
//将状态拿出来,保存成ArrayList
//输出包装
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
for(UrlViewCount urlViewCount:urlViewCountListState.get())//得到OUT是一个iterable类型
urlViewCountArrayList.add(urlViewCount);
//排序
urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
@Override
public int compare(UrlViewCount o1, UrlViewCount o2) {
return o2.count.intValue()-o1.count.intValue();
}
});
//6.包装信息打印输出
StringBuilder result = new StringBuilder();
result.append("---------------\n");
//获取窗口信息
result.append("窗口结束时间:"+new Timestamp(ctx.getCurrentKey())+"\n");
//包装信息输出
for(int i = 0;i<2;i++){
UrlViewCount currTuple = urlViewCountArrayList.get(i);
String info = "No."+(i+1)+" "
+"url:"+currTuple.url+" "
+"访问量"+currTuple.count+"\n ";
result.append(info);
}
result.append("--------------------\n");
out.collect(result.toString());
}
}
}
- 结果
url count> UrlViewCount{url='./home', count=1, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0}
url count> UrlViewCount{url='./cart', count=2, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0}
---------------
窗口结束时间:2022-11-25 22:42:40.0
No.1 url:./cart 访问量2
No.2 url:./home 访问量1
--------------------
url count> UrlViewCount{url='./home', count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
url count> UrlViewCount{url='./prod?id=100', count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
url count> UrlViewCount{url='./cart', count=4, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
---------------
窗口结束时间:2022-11-25 22:42:45.0
No.1 url:./cart 访问量4
No.2 url:./home 访问量2
--------------------
- 评价
可以做并行计算