【Flink实时数仓】数据仓库项目实战 《四》日志数据分流 【DWD】

时间:2022-12-19 09:53:50

【Flink实时数仓】数据仓库项目实战 《四》日志数据分流-流量域 【DWD】

DWD层设计要点:
(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。
(2)DWD层表名的命名规范为dwd_数据域_表名

1.流量域未经加工的事务事实表

1.1主要任务

1.1.1数据清洗(ETL)

数据传输过程中可能会出现部分数据丢失的情况,导致 JSON 数据结构不再完整,因此需要对脏数据进行过滤。

1.1.2新老访客状态标记修复

日志数据 common 字段下的 is_new 字段是用来标记新老访客状态的,1 表示新访客,0 表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。

1.1.3新老访客状态标记修复

本节将通过分流对日志数据进行拆分,生成五张事务事实表写入 Kafka.
流量域页面浏览事务事实表
流量域启动事务事实表
流量域动作事务事实表
流量域曝光事务事实表
流量域错误事务事实表

1.2图解

【Flink实时数仓】数据仓库项目实战 《四》日志数据分流 【DWD】

1.3代码

代码来自尚硅谷,微信关注尚硅谷公众号 回复: 大数据 即可获取源码及资料。

展示主流程代码。具体工具类及实现请下载源码。

package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

//数据流:web/app -> Nginx -> 日志服务器(.log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序:     Mock(lg.sh) -> Flume(f1) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
public class BaseLogApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境中设置为Kafka主题的分区数

        //1.1 开启CheckPoint
        //env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));

        //1.2 设置状态后端
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.消费Kafka topic_log 主题的数据创建流
        String topic = "topic_log";
        String groupId = "base_log_app_211126";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.过滤掉非JSON格式的数据&将每行数据转换为JSON对象
        OutputTag<String> dirtyTag = new OutputTag<String>("Dirty") {
        };
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {

                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(dirtyTag, value);
                }
            }
        });
        //获取侧输出流脏数据并打印
        DataStream<String> dirtyDS = jsonObjDS.getSideOutput(dirtyTag);
        dirtyDS.print("Dirty>>>>>>>>>>>>");

        //TODO 4.按照Mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //TODO 5.使用状态编程做新老访客标记校验
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {
            private ValueState<String> lastVisitState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));
            }

            @Override
            public JSONObject map(JSONObject value) throws Exception {

                //获取is_new标记 & ts 并将时间戳转换为年月日
                String isNew = value.getJSONObject("common").getString("is_new");
                Long ts = value.getLong("ts");
                String curDate = DateFormatUtil.toDate(ts);

                //获取状态中的日期
                String lastDate = lastVisitState.value();

                //判断is_new标记是否为"1"
                if ("1".equals(isNew)) {
                    if (lastDate == null) {
                        lastVisitState.update(curDate);
                    } else if (!lastDate.equals(curDate)) {
                        value.getJSONObject("common").put("is_new", "0");
                    }
                } else if (lastDate == null) {
                    lastVisitState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));
                }
                return value;
            }
        });

        //TODO 6.使用侧输出流进行分流处理  页面日志放到主流  启动、曝光、动作、错误放到侧输出流
        OutputTag<String> startTag = new OutputTag<String>("start") {
        };
        OutputTag<String> displayTag = new OutputTag<String>("display") {
        };
        OutputTag<String> actionTag = new OutputTag<String>("action") {
        };
        OutputTag<String> errorTag = new OutputTag<String>("error") {
        };
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {

                //尝试获取错误信息
                String err = value.getString("err");
                if (err != null) {
                    //将数据写到error侧输出流
                    ctx.output(errorTag, value.toJSONString());
                }

                //移除错误信息
                value.remove("err");

                //尝试获取启动信息
                String start = value.getString("start");
                if (start != null) {
                    //将数据写到start侧输出流
                    ctx.output(startTag, value.toJSONString());
                } else {

                    //获取公共信息&页面id&时间戳
                    String common = value.getString("common");
                    String pageId = value.getJSONObject("page").getString("page_id");
                    Long ts = value.getLong("ts");

                    //尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null && displays.size() > 0) {
                        //遍历曝光数据&写到display侧输出流
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("common", common);
                            display.put("page_id", pageId);
                            display.put("ts", ts);
                            ctx.output(displayTag, display.toJSONString());
                        }
                    }

                    //尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null && actions.size() > 0) {
                        //遍历曝光数据&写到display侧输出流
                        for (int i = 0; i < actions.size(); i++) {
                            JSONObject action = actions.getJSONObject(i);
                            action.put("common", common);
                            action.put("page_id", pageId);
                            ctx.output(actionTag, action.toJSONString());
                        }
                    }

                    //移除曝光和动作数据&写到页面日志主流
                    value.remove("displays");
                    value.remove("actions");
                    out.collect(value.toJSONString());
                }
            }
        });

        //TODO 7.提取各个侧输出流数据
        DataStream<String> startDS = pageDS.getSideOutput(startTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
        DataStream<String> actionDS = pageDS.getSideOutput(actionTag);
        DataStream<String> errorDS = pageDS.getSideOutput(errorTag);

        //TODO 8.将数据打印并写入对应的主题
        pageDS.print("Page>>>>>>>>>>");
        startDS.print("Start>>>>>>>>");
        displayDS.print("Display>>>>");
        actionDS.print("Action>>>>>>");
        errorDS.print("Error>>>>>>>>");

        String page_topic = "dwd_traffic_page_log";
        String start_topic = "dwd_traffic_start_log";
        String display_topic = "dwd_traffic_display_log";
        String action_topic = "dwd_traffic_action_log";
        String error_topic = "dwd_traffic_error_log";

        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));
        startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));
        displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));
        actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));
        errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));

        //TODO 9.启动任务
        env.execute("BaseLogApp");

    }

}

1.4数据测试

1.4.1 测试脏数据

[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>{"common":{"ar":
>

idea 结果脏数据打印,kafka未输出。
【Flink实时数仓】数据仓库项目实战 《四》日志数据分流 【DWD】

1.4.2 测试err 和 start 数据

输入数据

[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>{"common":{"ar":
>{"common":{"ar":"110000","ba":"Xiaomi","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","os":"Android 11.0","uid":"513","vc":"v2.1.134"},"err":{"error_code":2633,"msg":" Exception in thread \\  java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)"},"start":{"entry":"notice","loading_time":12438,"open_ad_id":7,"open_ad_ms":4407,"open_ad_skip_ms":0},"ts":1651217959000}
>

输出数据

[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_start_log
{"common":{"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7},"ts":1651217959000}
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_error_log
{"common":{"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi"},"err":{"msg":" Exception in thread \\  java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":2633},"start":{"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7},"ts":1651217959000}

idea打印数据
【Flink实时数仓】数据仓库项目实战 《四》日志数据分流 【DWD】

1.4.3 输入数据Display Action Page 数据

输入数据

{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"err":{"msg":" Exception in thread \\  java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"displays":[{"display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1},{"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2},{"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6}],"actions":[{"item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522}],"ts":1651217961000}

输出数据

dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"ts":1651217961000}
dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"ts":1651217961000}
dwd_traffic_display_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic  dwd_traffic_display_log
{"display_type":"query","page_id":"good_detail","item":"15","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":1,"order":1,"ts":1651217961000}
{"display_type":"query","page_id":"good_detail","item":"26","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":3,"order":2,"ts":1651217961000}
{"display_type":"query","page_id":"good_detail","item":"31","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":2,"order":3,"ts":1651217961000}
{"display_type":"promotion","page_id":"good_detail","item":"29","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":5,"order":4,"ts":1651217961000}
{"display_type":"query","page_id":"good_detail","item":"9","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":2,"order":5,"ts":1651217961000}
{"display_type":"recommend","page_id":"good_detail","item":"1","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":1,"order":6,"ts":1651217961000}
dwd_traffic_action_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_action_log
{"page_id":"good_detail","item":"5","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","action_id":"favor_add","item_type":"sku_id","ts":1651217964522}
dwd_traffic_error_log
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"err":{"msg":" Exception in thread \\  java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"displays":[{"display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1},{"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2},{"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6}],"actions":[{"item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522}],"ts":1651217961000}

idea打印数据
【Flink实时数仓】数据仓库项目实战 《四》日志数据分流 【DWD】