基于SparkSql的日志分析实战

时间:2024-05-22 18:54:22

目录

日志数据内容

用户行为日志分析的意义

离线数据处理流程

需求分析

数据清洗

解析访问日志

使用github上的开源项目

对日志进行统计分析 

统计最受欢迎的TOPN的视频访问次数

按照地市统计imooc主站最受欢迎的TOPN课程

按流量统计imooc主站最受欢迎的TOPN课程

调优点

EChart展示图形化界面

静态数据展示

动态数据展示


日志数据内容

  1. 访问系统属性,操作系统,浏览器
  2. 访问特征,点击的url,从那个url跳转过来的,页面的停留时间
  3. 访问信息,session_id,访问ip(访问城市)

用户行为日志分析的意义

  • 网站的眼睛
  • 网站的神经,在网站的布局是否合理,导航是否清晰,内容是否合理
  • 网站的大脑,分析的目标,要做哪些优化。

离线数据处理流程

  • 1)数据采集 flume来采集
  • 2)数据清洗,采集过来的数据有很多不符合日志规范的脏数据
  • 3)数据处理。 按照我们的需求进行相应业务的统计的分析
  • 4)处理结果入库  结果可以存放到RDBMS\noSQl
  • 5)数据可视化展示  ECharts\HUE\Zeppelin

需求分析

  1. 统计imooc主站最受欢迎的课程/手机的TOpN访问次数
  2. 按照地市统计imooc主站最受欢迎的TOPN课程
  3. 按流量统计imooc主站最受欢迎的TOPN课程

数据清洗

  1. 需要先将日志文件中的日期格式进行转换

simpleDataformat是线程不安全的。

package com.rachel

import java.text.SimpleDateFormat
import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

object DateUtils {
    //输入文件日期格式
    val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss Z",Locale.ENGLISH)
    //目标格式
    val TARGET_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

    def parse(time:String)={
      TARGET_FORMAT.format(new Date(getTime(time)))
    }

    def getTime(time:String) ={
      try{
        YYYYMMDDHHMM_TIME_FORMAT.parse(time.substring(time.indexOf("[")+1,
        time.lastIndexOf("]"))).getTime
      }catch {
        case e:Exception =>{
          0l
        }
      }
    }

  def main(args: Array[String]): Unit = {
      println(parse("[10/Nov/2016:00:01:02 +0800]"))
  }
}
package com.rachel

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext

object SparkStatFormatJob {
  def main(args: Array[String]): Unit = {
    val spark =  SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("file:///f:/java/10000_access.log")
    //access.take(10).foreach(println)
    access.map(line => {
      val splits = line.split(" ")
      val ip = splits(0)
      val time = splits(3)+" "+splits(4)

      val url = splits(11).replaceAll("\"","")
      val traffic = splits(9)
     // (ip,DateUtils.parse(time),url)
      DateUtils.parse(time)+"\t"+url+"\t"+traffic+"\t"+ip+"\t"
    }).saveAsTextFile("file:///f:/java/output_10000_access.log")

    spark.stop()

  }
}

解析访问日志

  • 使用SparkSql解析访问日志
  • 解析出课程编号、类型    
  • 根据IP解析出城市信息 val city = IpUtils.getCity(ip)
  • 使用SparkSql将访问时间按天进行分区。 
package com.rachel

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object AccessConvertUtil {
  val struct = StructType {
    Array(
      StructField("url", StringType),
      StructField("cmsType", StringType),
      StructField("cmsId", LongType),
      StructField("traffic", LongType),
      StructField("ip", StringType),
      StructField("city", StringType),
      StructField("time", StringType),
      StructField("day", StringType)
    )
  }

  /**
    * 根据输入的每一行信息转换成输出的样式
    * @param log  输入的每一行记录信息
    */
  def parseLog(log:String) = {

    try{
      val splits = log.split("\t")

      val url = splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)

      val domain = "http://www.imooc.com/"
      val cms = url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")

      var cmsType = ""
      var cmsId = 0l
      if(cmsTypeId.length > 1) {
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }

      val city = IpUtils.getCity(ip)
      val time = splits(0)
      val day = time.substring(0,10).replaceAll("-","")
      //这个row里面的字段要和struct中的字段对应上
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e:Exception => Row(0)
    }
  }
}
package com.rachel

import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkStatCleanJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SparkStatCleanJob")
      .config("spark.sql.parquet.compression.codec","gzip")
      .master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("file:///f:/java/access.log")
    val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
      AccessConvertUtil.struct)

//  accessDF.printSchema()
//  accessDF.show(false)
    accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
      .partitionBy("day").save("file:///f:/java/imooc/clean2")
  }
}

这一过程中使用了ipdatabase项目来解析ip地址。所以需要在自己的项目中使用别人的项目。

使用github上的开源项目

  1. git clone https://github.com/wzhe06/ipdatabase
  2. #编译下载的项目(需要切换到项目目录下)
    mvn clean package -DskipTests
  3. #将依赖的jar包导到自己的maven仓库
    mvn install:install-file -Dfile=F:/开发文档/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstart -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

对日志进行统计分析 

统计最受欢迎的TOPN的视频访问次数

package com.rachel

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNStatJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .appName("TopNStatJob")
      .master("local[2]").getOrCreate()
    val accessDF =  spark.read.format("parquet").load("file:///f:/java/imooc/clean2")
    
//  accessDF.printSchema()
//  accessDF.show(false)
    
    videoAccessTopNStat(spark,accessDF)
    spark.stop()
  }

  /**
    * 最受欢迎的TOPN课程,
    * 这其中包含了两种方法操作DF,一种是函数,一种是以SQL的方式对DF进行统计分析
    * @param spark
    * @param accessDF
    */
  def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame): Unit ={
    import  spark.implicits._
//  val videoAccessTopDF =  accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
//  .groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
   
    accessDF.createOrReplaceTempView("access_logs")
    val videoAccessTopDF = spark.sql("select day, cmsId, count(1) as times from access_logs " +
      "where day = '20170511' and cmsType = 'video'" +
      "group by day ,cmsId order by times desc")
    
    videoAccessTopDF.show(false)
  }
}

统计结果展示如下:

+--------+-----+------+
|day     |cmsId|times |
+--------+-----+------+
|20170511|14540|111027|
|20170511|4000 |55734 |
|20170511|14704|55701 |
|20170511|14390|55683 |
|20170511|14623|55621 |
|20170511|4600 |55501 |
|20170511|4500 |55366 |
|20170511|14322|55102 |
+--------+-----+------+
  • 将统计结果写入Mysql中,这个过程是从底层往上层封装的

1;编写MySQL的连接工具类

package com.rachel

import java.sql.{DriverManager, PreparedStatement}

import com.mysql.jdbc.Connection

/**
  * mysql操作工具类
  */
object MySQLUtils {
  /**
    * 获取数据库连接
    */
  def getConnection()={
      DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&&password=root")

  }
  /**
    * 释放数据库连接等资源
    * @param connection
    * @param pstmt
    */
  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
      }
    }
  }
}

2.创建数据库表

create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null, 
times bigint(10) not null,  
primary key (day, cms_id)
);

3:在POM.xml文件中添加JDBC依赖包

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

4:和表结构对应的课程访问次数实体类

package com.rachel

/**
  * 每天课程访问次数实体类
  */
case class DayVideoAccessStat (day: String, cmsId: Long, times: Long)

5:DAO层封装

package com.rachel

import java.sql.{Connection, PreparedStatement}


import scala.collection.mutable.ListBuffer

object StatDAO {
  /**
    * 批量保存DayVideoAccessStat到数据库
    *
    * @param list
    */
  def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {
    var connection: Connection = null
    var pstmt: PreparedStatement = null
    try {

      connection = MySQLUtils.getConnection()
      //使用批处理 设置手动提交
      connection.setAutoCommit(false)

      val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values(?,?,?)"
      pstmt = connection.prepareStatement(sql)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.times)
        pstmt.addBatch()
      }
      //执行批量处理
      pstmt.executeBatch()
      //手动提交
      connection.commit()
    }
    catch {
      case e: Exception => e.printStackTrace()
    }
    finally {
    }
  }
}

6:调用DAO实现统计结果写入MySQL

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer
object TopNStatJob {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .appName("TopNStatJob")
      .master("local[2]").getOrCreate()
    val accessDF =  spark.read.format("parquet").load("file:///f:/java/imooc/clean2")

//  accessDF.printSchema()
//  accessDF.show(false)
    videoAccessTopNStat(spark,accessDF)
    spark.stop()
  }

  /**
    * 最受欢迎的TOPN课程,
    * 这其中包含了两种方法操作DF,一种是函数,一种是以SQL的方式对DF进行统计分析
    * @param spark
    * @param accessDF
    */
  def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame): Unit ={
    accessDF.createOrReplaceTempView("access_logs")
    val videoAccessTopDF = spark.sql("select day, cmsId, count(1) as times from access_logs " +
      "where day = '20170511' and cmsType = 'video'" +
      "group by day ,cmsId order by times desc")
    try {
      videoAccessTopDF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayVideoAccessStat]

        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val times = info.getAs[Long]("times")
          /**
            * 不建议大家在此处进行数据库的数据插入
            */
          list.append(DayVideoAccessStat(day, cmsId, times))
        })
        StatDAO.insertDayVideoAccessTopN(list)
      })
    } catch {
      case e:Exception => e.printStackTrace()
    }
  }
}

7:执行程序后,查看数据库结果,展示如下

mysql> select * from day_video_access_topn_stat;
+----------+--------+--------+
| day      | cms_id | times  |
+----------+--------+--------+
| 20170511 |   4000 |  55734 |
| 20170511 |   4500 |  55366 |
| 20170511 |   4600 |  55501 |
| 20170511 |  14322 |  55102 |
| 20170511 |  14390 |  55683 |
| 20170511 |  14540 | 111027 |
| 20170511 |  14623 |  55621 |
| 20170511 |  14704 |  55701 |
+----------+--------+--------+
8 rows in set (0.00 sec)

按照地市统计imooc主站最受欢迎的TOPN课程

1.获取统计结果

  /**
    * 按照地市统计imooc主站最受欢迎的TOPN课程
    *
    * @param spark
    * @param accessDF
    */
  def cityAccessTopNStat(spark:SparkSession,accessDF:DataFrame)={
    import spark.implicits._
    val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
      .groupBy("day","city","cmsId")
      .agg(count("cmsId").as("times"))
    cityAccessTopNDF.show(false)
    val top3DF = cityAccessTopNDF.select(
      cityAccessTopNDF("day"),
      cityAccessTopNDF("city"),
      cityAccessTopNDF("cmsId"),
      cityAccessTopNDF("times"),
/*
    通过 ROW_NUMBER() over (PARTITION BY xx ORDER BY ** DESC) as row_number 
    可以根据xx字段分组,在分组内根据**字段排序,然后赋予每一行数据一个行编号
*/
      row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
        .orderBy(cityAccessTopNDF("times").desc)
      ).as("times_rank")
    ).filter("times_rank <=3") //.show(false)  //Top3
  }

2.创建数据库表

mysql> create table day_video_city_access_topn_stat (
    -> day varchar(8) not null,
    -> cms_id bigint(10) not null,
    -> city varchar(20) not null,
    -> times bigint(10) not null,
    -> times_rank int not null,
    -> primary key (day, cms_id, city)
    -> );
Query OK, 0 rows affected (0.56 sec)

3.创建对应的实体类对象

package com.rachel

case class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)

4.DAO层编写

 /**
    * 批量保存DayCityVideoAccessStat到数据库
    */
  def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnection()

      connection.setAutoCommit(false) //设置手动提交

      val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
      pstmt = connection.prepareStatement(sql)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setString(3, ele.city)
        pstmt.setLong(4, ele.times)
        pstmt.setInt(5, ele.timesRank)
        pstmt.addBatch()
      }

      pstmt.executeBatch() // 执行批量处理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }

 5.调用DAO层,将统计结果写入到Mysql中

    /**
     * 将统计结果写入到MySQL中
     */
    try {
      top3DF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayCityVideoAccessStat]

        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
          list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
        })

        StatDAO.insertDayCityVideoAccessTopN(list)
      })
    } catch {
      case e:Exception => e.printStackTrace()
    }

在Mysql中验证结果:

mysql> select * from day_video_city_access_topn_stat order by city desc,times_rank asc;
+----------+--------+-----------+-------+------------+
| day      | cms_id | city      | times | times_rank |
+----------+--------+-----------+-------+------------+
| 20170511 |  14540 | 浙江省    | 22435 |          1 |
| 20170511 |  14322 | 浙江省    | 11151 |          2 |
| 20170511 |  14390 | 浙江省    | 11110 |          3 |
| 20170511 |  14540 | 广东省    | 22115 |          1 |
| 20170511 |  14623 | 广东省    | 11226 |          2 |
| 20170511 |  14704 | 广东省    | 11216 |          3 |
| 20170511 |  14540 | 安徽省    | 22149 |          1 |
| 20170511 |  14390 | 安徽省    | 11229 |          2 |
| 20170511 |  14704 | 安徽省    | 11162 |          3 |
| 20170511 |  14540 | 北京市    | 22270 |          1 |
| 20170511 |   4600 | 北京市    | 11271 |          2 |
| 20170511 |  14390 | 北京市    | 11175 |          3 |
| 20170511 |  14540 | 上海市    | 22058 |          1 |
| 20170511 |  14704 | 上海市    | 11219 |          2 |
| 20170511 |   4000 | 上海市    | 11182 |          3 |
+----------+--------+-----------+-------+------------+

按流量统计imooc主站最受欢迎的TOPN课程

1:获取统计结果

  /**
    * 按照流量进行统计
    */
  def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame): Unit = {
    import spark.implicits._
    val cityAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
      .groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
      .orderBy($"traffics".desc)
    //.show(false)
  }

2.创建数据库表

create table day_video_traffics_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day, cms_id)
);

3.实体类的创建

package com.rachel

case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)

4.DAO的实现

  /**
    * 批量保存DayVideoTrafficsStat到数据库
    */
  def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnection()

      connection.setAutoCommit(false) //设置手动提交

      val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) "
      pstmt = connection.prepareStatement(sql)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.traffics)
        pstmt.addBatch()
      }

      pstmt.executeBatch() // 执行批量处理
      connection.commit() //手工提交
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }

5.调用DAO实现往数据库中数据插入

 /**
   * 将统计结果写入到MySQL中
  */
    try {
      cityAccessTopNDF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayVideoTrafficsStat]
        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val traffics = info.getAs[Long]("traffics")
          list.append(DayVideoTrafficsStat(day, cmsId,traffics))
        })

        StatDAO.insertDayVideoTrafficsAccessTopN(list)
      })
    } catch {
      case e:Exception => e.printStackTrace()
    }

6:在mysql中查询数据

mysql> select * from  day_video_traffics_topn_stat
    -> ;
+----------+--------+----------+
| day      | cms_id | traffics |
+----------+--------+----------+
| 20170511 |   4000 | 27847261 |
| 20170511 |   4500 | 27877433 |
| 20170511 |   4600 | 27777838 |
| 20170511 |  14322 | 27592386 |
| 20170511 |  14390 | 27895139 |
| 20170511 |  14540 | 55454898 |
| 20170511 |  14623 | 27822312 |
| 20170511 |  14704 | 27737876 |
+----------+--------+----------+
8 rows in set (0.00 sec)

调优点

  • 控制文件输出的大小----coalesce
  • 分区数据类型的调整
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")

在执行以下代码的时候

val accessDF =  spark.read.format("parquet").load("file:///f:/java/imooc/clean2")
accessDF.printSchema()

控制台打印出来的schema如下:

	root
	 |-- url: string (nullable = true)
	 |-- cmsType: string (nullable = true)
	 |-- cmsId: long (nullable = true)
	 |-- traffic: long (nullable = true)
	 |-- ip: string (nullable = true)
	 |-- city: string (nullable = true)
	 |-- time: string (nullable = true)
	 |-- day: integer (nullable = true)

而类中定义的Struct是如下所示:

  val struct = StructType {
    Array(
      StructField("url", StringType),
      StructField("cmsType", StringType),
      StructField("cmsId", LongType),
      StructField("traffic", LongType),
      StructField("ip", StringType),
      StructField("city", StringType),
      StructField("time", StringType),
      StructField("day", StringType)
    )
  }

其中day的类型原本定义的StringType,但是在schema中变成了Integer类型

原因在官网中有解释

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types and string type are supported. Sometimes users may not want to automatically infer the data types of the partitioning columns. For these use cases, the automatic type inference can be configured by spark.sql.sources.partitionColumnTypeInference.enabled, which is default to true. When type inference is disabled, string type will be used for the partitioning columns.
  • Mysql的批量提交,在向mysql数据库中提交数据时,
    connection.setAutoCommit(false)来设置自动提交为false,
    for (ele <- list) {
      pstmt.setString(1, ele.day)
      pstmt.setLong(2, ele.cmsId)
      pstmt.setLong(3, ele.times)
      pstmt.addBatch()
    }//执行批量处理
    pstmt.executeBatch()
    //手动提交
    connection.commit()
    这样可以避免关系型数据库频繁的提交任务,可以提升效率。
    

EChart展示图形化界面

  • 静态数据展示

1,IDEA创建J2EE 的 WEB项目

基于SparkSql的日志分析实战

2:测试静态图片是否能正常显示,验证echart是否正常工作

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>test</title>
    <!-- 引入 ECharts 文件 -->
    <script src="js/echarts.min.js"></script>
</head>
<body>
    <!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
    <div id="main" style="width: 600px;height:400px;"></div>
    <script type="text/javascript">
        // 基于准备好的dom,初始化echarts实例
        var myChart = echarts.init(document.getElementById('main'));
        var option = {
            title : {
                text: '某站点用户访问来源',
                subtext: '纯属虚构',
                x:'center'
            },
            tooltip : {
                trigger: 'item',
                formatter: "{a} <br/>{b} : {c} ({d}%)"
            },
            legend: {
                orient: 'vertical',
                left: 'left',
                data: ['直接访问','邮件营销','联盟广告','视频广告','搜索引擎']
            },
            series : [
                {
                    name: '访问来源',
                    type: 'pie',
                    radius : '55%',
                    center: ['50%', '60%'],
                    data:[
                        {value:335, name:'直接访问'},
                        {value:310, name:'邮件营销'},
                        {value:234, name:'联盟广告'},
                        {value:135, name:'视频广告'},
                        {value:1548, name:'搜索引擎'}
                    ],
                    itemStyle: {
                        emphasis: {
                            shadowBlur: 10,
                            shadowOffsetX: 0,
                            shadowColor: 'rgba(0, 0, 0, 0.5)'
                        }
                    }
                }
            ]
        };
        myChart.setOption(option);
    </script>
</body>
</html>

3:效果展示

基于SparkSql的日志分析实战

动态数据展示

1:引入依赖包

  <dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
  </dependency>

  <dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>jsp-api</artifactId>
    <version>2.0</version>
  </dependency>

  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
  </dependency>

  <dependency>
    <groupId>net.sf.json-lib</groupId>
    <artifactId>json-lib</artifactId>
    <version>2.4</version>
    <classifier>jdk15</classifier>
  </dependency>

2:创建servlet,此次是由上往下构建

package com.rachel.web;

import com.rachel.dao.VideoAccessTopNDAO;
import com.rachel.domain.VideoAccessTopN;
import net.sf.json.JSONArray;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;

public class VideoAccessTopNServlet extends HttpServlet {
    private VideoAccessTopNDAO dao;

    @Override
    public void init() throws ServletException {
        dao = new VideoAccessTopNDAO();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String day = req.getParameter("day");

        List<VideoAccessTopN> results =  dao.query(day);
        JSONArray json = JSONArray.fromObject(results);

        resp.setContentType("text/html;charset=utf-8");

        PrintWriter writer = resp.getWriter();
        writer.println(json);
        writer.flush();
        writer.close();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        this.doGet(req, resp);
    }
}

3:创建DAO

package com.rachel.dao;

import com.rachel.domain.VideoAccessTopN;
import com.rachel.utils.MySQLUtils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class VideoAccessTopNDAO {
    static Map<String,String> courses = new HashMap<String,String>();
    static {
        courses.put("4000", "MySQL优化");
        courses.put("4500", "Crontab");
        courses.put("4600", "Swift");
        courses.put("14540", "SpringData");
        courses.put("14704", "R");
        courses.put("14390", "机器学习");
        courses.put("14322", "redis");
        courses.put("14390", "神经网络");
        courses.put("14623", "Docker");
    }

    /**
     * 根据课程编号查询课程名称
     */
    public String getCourseName(String id) {
        return courses.get(id);
    }


    /**
     * 根据day查询当天的最受欢迎的Top5课程
     * @param day
     */
    public List<VideoAccessTopN> query(String day) {
        List<VideoAccessTopN> list = new ArrayList<VideoAccessTopN>();

        Connection connection = null;
        PreparedStatement psmt = null;
        ResultSet rs = null;

        try {
            connection = MySQLUtils.getConnection();
            String sql = "select cms_id ,times  from  day_video_access_topn_stat where day =? order by times desc limit 5";
            psmt = connection.prepareStatement(sql);
            psmt.setString(1, day);

            rs = psmt.executeQuery();

            VideoAccessTopN domain = null;
            while(rs.next()) {
                domain = new VideoAccessTopN();
                /**
                 * TODO... 在页面上应该显示的是课程名称,而我们此时拿到的是课程编号
                 *
                 * 如何根据课程编号去获取课程名称呢?
                 * 编号和名称是有一个对应关系的,一般是存放在关系型数据库
                 */
                domain.setName(getCourseName(rs.getLong("cms_id")+""));
                domain.setValue(rs.getLong("times"));

                list.add(domain);
            }

        }catch (Exception e) {
            e.printStackTrace();
        } finally {
            MySQLUtils.release(connection, psmt, rs);
        }
        return list;
    }

    public static void main(String[] args) {
        VideoAccessTopNDAO dao = new VideoAccessTopNDAO();
        List<VideoAccessTopN> list = dao.query("20170511");
        for(VideoAccessTopN result: list) {
            System.out.println(result.getName() + " , " + result.getValue());
        }
    }

}

4:创建javaBean

package com.rachel.domain;

public class VideoAccessTopN {
    private String name;
    private long value ;


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

5:连接数据库

package com.rachel.utils;

import java.sql.*;

public class MySQLUtils {
    private static final String USERNAME = "root";

    private static final String PASSWORD = "root";

    private static final String DRIVERCLASS = "com.mysql.jdbc.Driver";

    private static final String URL = "jdbc:mysql://localhost:3306/imooc_project";


    /**
     * 获取数据库连接
     */
    public static Connection getConnection() {
        Connection connection = null;
        try {
            Class.forName(DRIVERCLASS);
            connection = DriverManager.getConnection(URL,USERNAME,PASSWORD);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return connection;
    }


    /**
     * 释放资源
     */
    public static void release(Connection connection, PreparedStatement pstmt, ResultSet rs) {
        if(rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if(pstmt != null) {
            try {
                pstmt.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if(connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        System.out.println(getConnection());
    }
}

6:因为想用servlet,所以前台用ajax调用后台代码,引入jquery,并修改前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>test</title>
    <!-- 引入 ECharts 文件 -->
    <script src="js/echarts.min.js"></script>
    <SCRIPT src="js/jquery.js"></SCRIPT>
</head>
<body>
    <!-- 为 ECharts 准备一个具备大小(宽高)的 DOM -->
    <div id="main" style="width: 600px;height:400px;"></div>
    <script type="text/javascript">
        // 基于准备好的dom,初始化echarts实例
        var myChart = echarts.init(document.getElementById('main'));
        var option = {
            title : {
                text: '最受欢迎的TOPN课程',
                x:'center'
            },
            tooltip : {
                trigger: 'item',
                formatter: "{a} <br/>{b} : {c} ({d}%)"
            },
            legend: {
                orient: 'vertical',
                left: 'left',
                data: []
            },
            series : [
                {
                    name: '访问次数',
                    type: 'pie',
                    radius : '55%',
                    center: ['50%', '60%'],
                    data:(function(){
                        var courses = [];
                        $.ajax({
                            type:"GET",
                            url:"/stat?day=20170511",
                            dataType:'json',
                            async:false,
                            success:function(result) {
                                for(var i=0;i<result.length; i++){
                                    courses.push({"value": result[i].value,"name":result[i].name});
                                }
                            }
                        })
                        return courses;
                    })(),
                    itemStyle: {
                        emphasis: {
                            shadowBlur: 10,
                            shadowOffsetX: 0,
                            shadowColor: 'rgba(0, 0, 0, 0.5)'
                        }
                    }
                }
            ]
        };
        myChart.setOption(option);
    </script>
</body>
</html>

7:配置web.xml中url和servlet类的对应

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
  <display-name>Archetype Created Web Application</display-name>
  <servlet>
    <servlet-name>stat</servlet-name>
    <servlet-class>com.rachel.web.VideoAccessTopNServlet</servlet-class>
  </servlet>
  <servlet-mapping>
    <servlet-name>stat</servlet-name>
    <url-pattern>/stat</url-pattern>
  </servlet-mapping>
</web-app>

8:重启服务,查看效果

基于SparkSql的日志分析实战

Zeppelin的数据展示

1:安装下载Zeppelin

下载地址:http://www.apache.org/dyn/closer.cgi/zeppelin/zeppelin-0.8.0/zeppelin-0.8.0-bin-all.tgz

2:解压安装

tar -zxvf zeppelin-0.8.0-bin-all.tgz 

3:启动服务

[[email protected] zeppelin-0.8.0-bin-all]$ cd bin
[[email protected] bin]$ ./zeppelin-daemon.sh start

4:访问http://192.168.1.6:8080

5:配置JDBC连接Mysql

基于SparkSql的日志分析实战

6;新建note

基于SparkSql的日志分析实战