基于Filebeat+Kafka+Flink仿天猫双11实时交易额

时间:2022-08-27 09:07:43

1. 写在前面

在大数据实时计算方向,天猫双11的实时交易额是最具权威性的,当然技术架构也是相当复杂的,不是本篇博客的简单实现,因为天猫双11的数据是多维度多系统,实时粒度更微小的。当然在技术的总体架构上是相近的,主要的组件都是用到大数据实时计算组件Flink(当然阿里是用了基于Flink深度定制和优化改装的Blink)。下图是天猫双11实时交易额的大体架构模型及数据流向(参照https://baijiahao.baidu.com/s?id=1588506573420812062&wfr=spider&for=pc)

基于Filebeat+Kafka+Flink仿天猫双11实时交易额

2. 仿天猫双11实时交易额技术架构

利用Linux shell自动化模拟每秒钟产生一条交易额数据,数据内容为用户id,购买商品的付款金额,用户所在城市及所购买的商品

基于Filebeat+Kafka+Flink仿天猫双11实时交易额

技术架构上利用Filebeat去监控每生产的一条交易额记录,Filebeat将交易额输出到Kafka(关于Filebeat和kafka的安装或应用请参照之前的博客),然后编写Flink客户端程序去实时消费Kafka数据,对数据进行两块计算,一块是统计实时总交易额,一块是统计不同城市的实时交易额

技术架构图

基于Filebeat+Kafka+Flink仿天猫双11实时交易额

3.具体实现

3.1. 模拟交易额数据double11.sh脚本

#!/bin/bash
i=1
for i in $(seq 1 60)
do
customernum=`openssl rand -base64 8 | cksum | cut -c1-8`
pricenum=`openssl rand -base64 8 | cksum | cut -c1-4`
citynum=`openssl rand -base64 8 | cksum | cut -c1-2`
itemnum=`openssl rand -base64 8 | cksum | cut -c1-6`
echo "customer"$customernum","$pricenum",""city"$citynum",""item"$itemnum >> /home/hadoop/tools/double11/double11.log
sleep 1
done

将double11.sh放入Linux crontab

#每分钟执行一次
* * * * * sh /home/hadoop/tools/double11/double11.sh

3.2. 实时监控double11.log

Filebeat实时监控double11.lo*生的每条交易额记录,将记录实时流向到Kafka的topic,这里只需要对Filebeat的beat-kafka.yml做简单配置,kafka只需要启动就好

基于Filebeat+Kafka+Flink仿天猫双11实时交易额

3.3. 核心:编写Flink客户端程序

这里将统计实时总交易额和不同城市的实时交易额区分写成两个类(只提供Flink Java API)

需要导入的maven依赖

    <dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.0.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.0.0</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.0.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency> </dependencies>

统计实时总交易额代码

package com.fastweb;

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector; public class Double11Sum {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000); Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.184.12:9092");
properties.setProperty("zookeeper.connect", "192.168.184.12:2181");
properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
properties); DataStream<String> stream = env.addSource(myConsumer); DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);
counts.print(); env.execute("Double 11 Real Time Transaction Volume");
} //统计总的实时交易额
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L; public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
JSONObject object = JSONObject.parseObject(value);
String message = object.getString("message");
Integer price = Integer.parseInt(message.split(",")[1]);
out.collect(new Tuple2<String, Integer>("price", price));
}
}
}

统计不同城市的实时交易额

package com.fastweb;

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector; public class Double11SumByCity {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(1000); Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.184.12:9092");
properties.setProperty("zookeeper.connect", "192.168.184.12:2181");
properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
properties); DataStream<String> stream = env.addSource(myConsumer); DataStream<Tuple2<String, Integer>> cityCounts = stream.flatMap(new CitySplitter()).keyBy(0).sum(1);
cityCounts.print(); env.execute("Double 11 Real Time Transaction Volume");
} //按城市分类汇总实时交易额
public static final class CitySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L; public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
JSONObject object = JSONObject.parseObject(value);
String message = object.getString("message");
Integer price = Integer.parseInt(message.split(",")[1]);
String city = message.split(",")[2];
out.collect(new Tuple2<String, Integer>(city, price));
}
}
}

代码解释:这里可以方向两个类里面只有flatMap的对数据处理的内部类不同,但两个内部类的结构基本相同,在内部类里面利用fastjson解析了一层获取要得到的数据,这是因为经过Filebeat监控的数据是json格式的,Filebeat这样实现是为了在正式的系统上确保每条数据的来源IP,时间戳等信息

3.4. 验证

启动Double11Sum类的main方法就可以得到实时的总交易额,按城市分类的实时交易额也一样,这个结果是实时更新的,每条记录都是新的

基于Filebeat+Kafka+Flink仿天猫双11实时交易额

基于Filebeat+Kafka+Flink仿天猫双11实时交易额的更多相关文章

  1. 2018天猫双11各类目品牌成交额top10排行榜

    2018天猫双11总成交额213,550,497,011元,你知道各类目品牌成交额排行吗?一起来看看吧,赶紧收藏,以后就知道要怎么买了! 相关阅读: 2018天猫双11各类目品牌成交额top10排行榜 ...

  2. 天猫双11红包前端jQuery

    [01]   浏览器支持:IE10+和其他现代浏览器.   效果图:       步骤:   HTML部分:   <div class="opacity" style=&qu ...

  3. 我们知道CDN护航了双11十年,却不知道背后有那么多故事……

    情不知如何而起,竟一往情深.恰如我们.十年前,因为相信,所以看见.十年后,就在眼前,看见一切. 当2018天猫双11成交额2135亿元的大屏上,打出这么一段字的时候,参与双11护航的阿里云CDN技术掌 ...

  4. 2684亿!阿里CTO张建锋:不是任何一朵云都撑得住双11

    2019天猫双11 成交额2684亿! "不是任何一朵云都能撑住这个流量.中国有两朵云,一朵是阿里云,一朵叫其他云."11月11日晚,阿里巴巴集团CTO张建锋表示,"阿里 ...

  5. 欠了好久的CRM帖子,双11来读。

    又一年双11了,觉得天猫双11越来越没特色了. 从折扣,音符旋律到红包,今年15年却找不出往年的热度,只是商家还是一样的急,备着活动目标计划,做着库存价格打标视觉设计这种苦逼的日子. 欠了好久的CRM ...

  6. 深入探访支付宝双11十年路,技术凿穿焦虑与想象极限 &vert; CYZONE特写

    小蚂蚁说: 双11十年间,交易规模的指数级增长不断挑战人们的想象力,而对蚂蚁技术团队来说,这不仅是一场消费盛宴,而是无数次濒临压力和焦虑极限的体验,更是技术的练兵场.如今双11对蚂蚁金服而言,已经绝不 ...

  7. 最强CP!阿里云联手支付宝小程序如何助力双11&quest;

    作为首次“全面上云”的双11,阿里云征服了每秒订单峰值54.4万笔的世界新记录.正是在阿里云的保驾护航下,即使访问量是平时的5到6倍,小程序也鲜少出现卡顿或者宕机的现象,“依靠阿里云,我们整个天猫双1 ...

  8. 不仅仅是双11大屏—Flink应用场景介绍

    双11大屏 每年天猫双十一购物节,都会有一块巨大的实时作战大屏,展现当前的销售情况. 这种炫酷的页面背后,其实有着非常强大的技术支撑,而这种场景其实就是实时报表分析. 实时报表分析是近年来很多公司采用 ...

  9. ELK&plus;Filebeat&plus;Kafka&plus;ZooKeeper 构建海量日志分析平台&lpar;elk5&period;2&plus;filebeat2&period;11&rpar;

    ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台 参考:http://www.tuicool.com/articles/R77fieA 我在做ELK日志平台开始之初选择为 ...

随机推荐

  1. struts2自定义MVC框架

    自定义MVC:(首先了解Model1和Model2的概念) Model1与Model2: Model1:就是一种纯jsp开发技术,将业务逻辑代码和视图渲染代码杂糅在一起. Model2:Model2是 ...

  2. (转)WCF开发框架形成之旅---WCF的几种寄宿方式

    WCF寄宿方式是一种非常灵活的操作,可以在IIS服务.Windows服务.Winform程序.控制台程序中进行寄宿,从而实现WCF服务的运行,为调用者方便.高效提供服务调用.本文分别对这几种方式进行详 ...

  3. app启动速度

    跟踪代码发现,应用启动时的白屏会持续到draw调用完成,这个过程中任何耗时操作将导致白屏时间增长. 1.adb shell am start -W -n yourpakagename/MainActi ...

  4. &lbrack;python&rsqb; 字符串与列表、字典的转换

    1.字符串->字典:eval(str) 2.字符串->列表:list(str)

  5. webpack构建react项目&lpar;一&rpar;

    前言 下面是我们使用到技术栈: webpack + react + redux + react-router + react-thunk + ES6 + .... 注意事项: 建议使用npm5.X 或 ...

  6. 一文学会matplotlib

    matplotlib基础 “““ 假设一天中每隔两个小时(range(2,26,2))的气温(℃)分别是[15,13,14.5,17,20,25,26,26,27,22,18,15] 用matplot ...

  7. org&period;apache&period;flume&period;conf&period;ConfigurationException&colon; Channel c1 not in active set&period;

    1 错误详细信息 WARN conf.FlumeConfiguration: Could not configure sink k1 due to: Channel c1 not in active ...

  8. springboot&plus;cxf 开发webservice

    参考 https://www.cnblogs.com/fuxin41/p/6289162.html pom.xml <?xml version="1.0" encoding= ...

  9. HighCharts 特性;Highcharts 环境配置

    Highcharts Highcharts 是一个用纯JavaScript编写的一个图表库. Highcharts 能够很简单便捷的在web网站或是web应用程序添加有交互性的图表 Highchart ...

  10. 使用UrlRewriteFilter对url进行更替

    一般来说,使用struts之后url的访问实际*问的是action的地址,为了不让该地址暴露给别人,可以采用UrlRewriteFilter来对url进行重写. 首先,在web.xml里面配置: & ...