SparkSQL(一)

时间:2022-02-15 00:27:42

一、概述

 

组件

SparkSQL(一)

 

 

SparkSQL(一)

 

 

 

运行机制

SparkSQL(一)

 

 

转 SparkSQL – 从0到1认识Catalyst  https://blog.csdn.net/qq_36421826/article/details/81988157

深入研究Spark SQL的Catalyst优化器(原创翻译)

 

 

 

更高效

SparkSQL(一)

 

 

 

 

 

查询优化

SparkSQL(一)

SparkSQL(一)

 

 

 优化:把filter提前

 

 数据源优化

SparkSQL(一)

 

 

 编译优化 Code generation

SparkSQL(一)

 

 

 

SparkSQL(一)

 

 

 

DataSet和DataFrame

SparkSQL(一)

 

 

 

SparkSQL(一)

 

 

 

 

数据源

SparkSQL(一)

 

 

 Parquet文件

SparkSQL(一)

 

 

 Json文件

SparkSQL(一)

 

 

 读取Hive中文件

SparkSQL(一)

 

 

 外部数据源spark.read.format

SparkSQL(一)

 

 

 

 

二、程序设计

常规流程

SparkSQL(一)

 

 

 

API:SQL与DataFrame DSL

SparkSQL(一)

 

 

 

 

 

SparkSQL(一)

 

 

 

 

SparkSQL(一)

 

 

 

 

 

 

统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark .read .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs") // 统计分析内容大小-全部内容大小,日志条数,最小内容大小,最大内容大小
    val contentSizeStats: Row = spark.sql("SELECT SUM(contentSize), COUNT(*), MIN(contentSize), MAX(contentSize) FROM logs").first()
    val sum = contentSizeStats.getLong(0)
    val count = contentSizeStats.getLong(1)
    val min = contentSizeStats.getLong(2)
    val max = contentSizeStats.getLong(3)
    println("sum %s, count %s, min %s, max %s".format(sum, count, min, max))
    println("avg %s", sum / count)
    spark.close()


  }
}

ApacheAccessLog

package org.sparkcourse.log

import sun.security.x509.IPAddressName

case class ApacheAccessLog(ipAddress: String,
                           clientIdentd: String,
                           userId: String,
                           dateTime: String,
                           method: String,
                           endpoint: String,
                           protocol: String,
                           responseCode: Int,
                           contentSize: Long){
}

object ApacheAccessLog {
  // 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
  val PATTERN = """^(S ) (S ) (S ) [([w:/] s -d{4})] "(S ) (S ) (S )" (d{3}) (d )""".r

  def parseLogLine(log: String): ApacheAccessLog = {
    log match {
      case PATTERN(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode, contentSize)
        => ApacheAccessLog(ipAddress, clientIdentd, userId, dateTime, method, endpoint, protocol, responseCode.toInt, contentSize.toLong)
      case _ => throw new RuntimeException(s"""Cannot parse log line: $log""")
    }
  }

 

统计每种返回码的数量

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")



    // 统计每种返回码的数量.
    val responseCodeToCount = spark.sql("SELECT responseCode, COUNT(*) FROM logs GROUP BY responseCode LIMIT 100")
      .map(row => (row.getInt(0), row.getLong(1)))
      .collect()
    responseCodeToCount.foreach(print(_))
  }
}

 

统计哪个IP地址访问服务器超过10次

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")





    // 统计哪个IP地址访问服务器超过10次
    val ipAddresses = spark.sql("SELECT ipAddress, COUNT(*) AS total FROM logs GROUP BY ipAddress HAVING total > 10 LIMIT 100")
      .map(row => row.getString(0))
      .collect()
    ipAddresses.foreach(println(_))
  }
}

 

查询访问量最大的访问目的地址

package org.sparkcourse.log

import org.apache.spark.sql.{Row, SparkSession}

object LogAnalyzerSQL {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Log Analyzer")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val accessLogs = spark
      .read
      .textFile("data/weblog/apache.access.log")
      .map(ApacheAccessLog.parseLogLine).toDF()

    accessLogs.createOrReplaceTempView("logs")




    // 查询访问量最大的访问目的地址
    val topEndpoints = spark.sql("SELECT endpoint, COUNT(*) AS total FROM logs GROUP BY endpoint ORDER BY total DESC LIMIT 10")
      .map(row => (row.getString(0), row.getLong(1)))
      .collect()
    topEndpoints.foreach(println(_))

  }
}