Flink-使用flink处理函数以及状态编程实现TopN案例

时间:2022-12-01 11:15:20

7.5 应用案例-TopN

7.5.1 使用ProcessAllWindowFunction

  1. 场景

例如,需要统计最近10秒内最热门的两个url链接,并且每5秒

  1. 思路
  • 使用全窗口函数ProcessAllWindowFunction开窗处理,使用HashMap来保存每个url的访问次数(通过遍历)
  • 然后转成ArrayList,然后进行排序,取前两名输出即可
  1. 代码
  • 数据源代码
public class ClickSource implements SourceFunction<Event> {

    //声明一个标志位控制数据生成
    private Boolean running = true;
    @Override
    //泛型为Event
    public void run(SourceContext<Event> ctx) throws Exception {

        //随机生成数据
        Random random = new Random();
        //定义字段选取的数据集
        String[] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=100","/prod?id=10"};

        //一直循环生成数据
        while (running){
            String user = users[random.nextInt(users.length-1)];
            String url = urls[random.nextInt(urls.length-1)];
            //系统当前事件的毫秒数
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            //collect收集Event发往下游
            ctx.collect(new Event(user,url,timestamp));

            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running =false;
    }
}
  • 核心代码
public class TopNExample_ProcessAllWindowFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序种延迟0,相当于-1毫秒而已
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        //直接开窗,收集数据排序
        stream.map(data->data.url)//得到String类型的Stream
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))//直接开窗
                .aggregate(new UrlHashMapCountAgg(),new UrlAllWindowResult())
                .print();

        env.execute();
    }
    //实现自定义的增量聚合函数
    public static class UrlHashMapCountAgg implements AggregateFunction<String, HashMap<String,Long>, ArrayList<Tuple2<String,Long>>> {
        @Override
        public HashMap<String, Long> createAccumulator() {
            return new HashMap<>();
        }

        @Override
        public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
            if(accumulator.containsKey(value)){
                Long count = accumulator.get(value);
                accumulator.put(value,count+1);
            }else {
                accumulator.put(value,1L);
            }
            return accumulator;
        }

        //就HashMap转成ArrayList<Tuple2<String, Long>>的操作
        @Override
        public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
            ArrayList<Tuple2<String, Long>> result = new ArrayList<>();
            for(String key:accumulator.keySet()){
                result.add(Tuple2.of(key,accumulator.get(key)));
            }
            //排序
            result.sort(new Comparator<Tuple2<String, Long>>() {
                @Override
                //降序,后减前
                public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                    return o2.f1.intValue()-o1.f1.intValue();
                }
            });
            return result;
        }

        @Override
        public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
            return null;
        }
    }
    //实现自定义全窗口函数,包装信息输出结果
    public static class UrlAllWindowResult extends ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow> {
        @Override
        public void process(Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {

            //先拿出来
            ArrayList<Tuple2<String, Long>> list = elements.iterator().next();

            StringBuilder result = new StringBuilder();
            result.append("---------------\n");

            //获取窗口信息
            result.append("窗口结束时间:"+new Timestamp(context.window().getEnd())+"\n");

            //取List排过序后的前两个,包装信息输出
            for(int i = 0;i<2;i++){
                Tuple2<String, Long> currTuple = list.get(i);
                String info = "No."+(i+1)+" "
                            +"url:"+currTuple.f0+" "
                            +"访问量"+currTuple.f1+"\n ";
                result.append(info);
            }
            result.append("--------------------\n");

            out.collect(result.toString());

        }
    }
}
  • 结果
窗口结束时间:2022-11-25 21:58:35.0
No.1 url:./fav 访问量1
 No.2 url:./home 访问量1
 --------------------

---------------
窗口结束时间:2022-11-25 21:58:40.0
No.1 url:./home 访问量3
 No.2 url:./prod?id=100 访问量3
 --------------------

---------------
窗口结束时间:2022-11-25 21:58:45.0
No.1 url:./prod?id=100 访问量4
 No.2 url:./cart 访问量2
 --------------------

---------------
窗口结束时间:2022-11-25 21:58:50.0
No.1 url:./prod?id=100 访问量4
 No.2 url:./fav 访问量3
 --------------------
  1. 评价

用这个方法思路易懂,但是使用了windowAll的全窗口函数,stream直接开窗,所有数据收集到窗口中,导致无分区也就是并行度会变成1,大数据场景下内存估计会炸产生OOM

7.5.2 使用 KeyedProcessFunction

  1. 场景

例如,需要统计最近10秒内最热门的两个url链接,并且每5秒

  1. 思路
  • 触发

    • 参照窗口的流式处理原理,将数据汇聚一段时间后输出,就可以使用定时器

    • 窗口结束时间+1豪秒使得watermark触发,即数据到齐

  • 收集

    • 定义一个列表把所有数据保存下来
    • 使用状态,根据之前keyby按键分组的状态
  • 输出

    • 排序
    • 输出
  1. 代码

跟上面差不多,多了状态设置,可以理解urlViewCountListState这个就是用来存有状态的数据的

  • 代码
public class TopNExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                //乱序种延迟0,相当于-1毫秒而已
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        //1.按照url分组,统计窗口内每个url的访问量
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream
                .keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.UrlViewCountResult());

        urlCountStream.print("url count");

        //2.对一同一窗口统计出的访问量,进行手机和排序(以聚合过的结果按照窗口间隔不间断流式输出)
        urlCountStream.keyBy(data->data.windowEnd)
                        .process(new TopNProcessResult(2))
                        .print();
        env.execute();


    }
    //实现自定义的KeyProcessFunction
    public static class TopNProcessResult extends KeyedProcessFunction<Long,UrlViewCount,String> {
        //定义一个属性n
        private Integer n;

        //1.定义列表状态
        private ListState<UrlViewCount> urlViewCountListState;

        public TopNProcessResult(Integer n) {
            this.n = n;
        }

        //2.管理状态,在环境中获取状态,使用生命周期方法获取
        @Override
        public void open(Configuration parameters) throws Exception {
            urlViewCountListState= getRuntimeContext().getListState(//传入描述器
                    //两个参数:一个名字,一个类型
                    new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)));
        }

        @Override
        public void processElement(UrlViewCount value,Context ctx, Collector<String> out) throws Exception {
            //3.将数据保存到状态中
            urlViewCountListState.add(value);
            //4.注册windowEnd+1ms的定时器
            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey()+1);
        }
        //5.用来触发定时器
        //将状态拿出来,保存成ArrayList
        //输出包装
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

            ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
            for(UrlViewCount urlViewCount:urlViewCountListState.get())//得到OUT是一个iterable类型
                urlViewCountArrayList.add(urlViewCount);

            //排序
            urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
                @Override
                public int compare(UrlViewCount o1, UrlViewCount o2) {
                   return o2.count.intValue()-o1.count.intValue();
                }
            });


            //6.包装信息打印输出

            StringBuilder result = new StringBuilder();
            result.append("---------------\n");

            //获取窗口信息
            result.append("窗口结束时间:"+new Timestamp(ctx.getCurrentKey())+"\n");

            //包装信息输出
            for(int i = 0;i<2;i++){
                UrlViewCount currTuple = urlViewCountArrayList.get(i);
                String info = "No."+(i+1)+" "
                        +"url:"+currTuple.url+" "
                        +"访问量"+currTuple.count+"\n ";
                result.append(info);
            }
            result.append("--------------------\n");

            out.collect(result.toString());
            
        }
    }
}
  • 结果
url count> UrlViewCount{url='./home', count=1, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0}
url count> UrlViewCount{url='./cart', count=2, windowStart=2022-11-25 22:42:30.0, windowEnd=2022-11-25 22:42:40.0}
---------------
窗口结束时间:2022-11-25 22:42:40.0
No.1 url:./cart 访问量2
 No.2 url:./home 访问量1
 --------------------

url count> UrlViewCount{url='./home', count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
url count> UrlViewCount{url='./prod?id=100', count=2, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
url count> UrlViewCount{url='./cart', count=4, windowStart=2022-11-25 22:42:35.0, windowEnd=2022-11-25 22:42:45.0}
---------------
窗口结束时间:2022-11-25 22:42:45.0
No.1 url:./cart 访问量4
 No.2 url:./home 访问量2
 --------------------
  • 评价

可以做并行计算