spark(8)spark案例之WordCount、点击流日志分析、写入数据到mysql/hbase、IP地址查询

时间:2024-04-16 07:41:12

案例1:使用Java实现spark的wordCount

案例需求:

单词计数

第一步:创建maven工程,引入依赖

 <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.3</version>
    </dependency>
</dependencies>

第二步:代码开发

说明:

  1. 使用Java编写spark程序,其实跟scala的步骤是一样的,只不过写法有点变化而已。
  2. scala的RDD对应Java中的JavaRDD
  3. scala的SparkContext对应Java中的JavaSparkContext
  4. scala方法中的参数为函数时,在Java中要改成对象,因为Java是面向对象的,这是scala相对于Java非常不同的地方。
  5. 编写spark程序的大致步骤如下:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class MyJavaSpark {
    public static void main(String[] args) {
        //1、创建spark Conf
        SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");

        //2、创建spark Context
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        //3、读取数据
        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("F:\\test\\aa.txt", 2);

        //4、切分每一行数据为一个个单词
        final JavaRDD<String> wordsRDD = stringJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                String[] s1 = s.split(" ");
                return Arrays.asList(s1).iterator();
            }
        });

        //5、每个单词计为1
        JavaPairRDD<String, Integer> wordAndOneRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        //6、相同的单词累加1
        JavaPairRDD<String, Integer> resultRDD = wordAndOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //7、收集数据
        List<Tuple2<String, Integer>> collectRDD = resultRDD.collect();

        //8、打印数据
        for(Tuple2<String,Integer> t:collectRDD){
            System.out.println("单词:"+t._1+"次数:"+t._2);
        }

        //9、关闭资源
        javaSparkContext.stop();
    }
}

运行结果为:

单词:hive次数:1
单词:flink次数:1
单词:spark次数:4
单词:hadoop次数:3
单词:flume次数:1
单词:hbase次数:1

案例2:实现点击流日志数据分析

点击流日志数据:用户在网站的浏览行为记录

案例数据

资料中的access.log文件,文件里一行数据的格式大致如下:

60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
  1. 数据中的横杠-也是一个字段, 表示无
  2. 一行数据代表一次访问。
  3. 第1个字段是用户的ip地址

image-20200415221704529

统计PV

PV:页面浏览量,是网站各个网页被浏览的总次数。对应于access.log中的数据,一行数据就是一条浏览记录。

因此,要获取PV,实质是要统计access.log文件中行数。

import org.apache.spark.{SparkConf, SparkContext}

object PV {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("PV").setMaster("local[2]")
    val sc=new SparkContext(sparkconf)
    val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\access.log")
    val pv=data.count();
    println(pv)
    sc.stop()
  }
}

运行结果为:

14619

统计UV

UV(Unique Visitor)是独立访客数。放在这里就是有多少个不同的ip地址的访客访问过网站,相同ip地址的访客,无论访问网站多少次,都只算入UV一次

因此,spark程序的大致步骤是:加载每一行数据,获取每一行数据的ip地址,对ip地址去重,然后统计ip数量。

代码开发:

import org.apache.spark.{SparkConf, SparkContext}

object PV {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("PV").setMaster("local[2]")
    val sc=new SparkContext(sparkconf)
    val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\access.log")
    //获取每一行的ip地址:
    val rdd2=data.map(x=>x.split(" ")(0))
    //去重:
    val rdd3=rdd2.distinct()
    val uv=rdd3.count();
    println(uv)
    sc.stop()
  }
}

1050

获取被访问的TopN页面地址

数据文件里每一行数据代表一次访问,每一行数据的第11个字段是被访问的页面地址,如

60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"

中的"http://cos.name/category/software/packages/"。

但是,有些行的数据是不完整的,可能没有第11个字段,或者第11个字段的值是 "-" ,因此,我们首先要进行数据的处理,然后再分析数据。

注意,"-"中的双引号是包括在数据里面的,千万别少写了,特别注意下面代码块中的第11行代码。

代码开发

import org.apache.spark.{SparkConf, SparkContext}

object PV {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("PV").setMaster("local[2]")
    val sc=new SparkContext(sparkconf)
    val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\access.log")
    //处理数据,使得每一行数据至少有11个字段
    val data2=data.filter(x=>x.split(" ").length>10)
    //处理数据,使得每一行数据的第11个字段都不为 "-",注意,双引号也包括在里面
    val data3=data2.filter(x=>x.split(" ")(10)!="\"-\"")
    //获取第11个字段
    val rdd10=data3.map(x=>x.split(" ")(10))
    //每个计1
    val result1=rdd10.map(x=>(x,1))
    //统计
    val result2=result1.reduceByKey(_+_)
    //排序
    val sortRDD=result2.sortBy(x=>x._2,false)
    //获取Top5
    val finalRes=sortRDD.take(5)
    //打印:
    finalRes.foreach(println)
    sc.stop()
  }
}

运行结果为:

("http://blog.fens.me/category/hadoop-action/",547)
("http://blog.fens.me/",377)
("http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10",360)
("http://blog.fens.me/r-json-rjson/",274)
("http://blog.fens.me/angularjs-webstorm-ide/",271)

案例3:读取文件数据写入到mysql表中

创建maven工程

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.3</version>
    </dependency>
    <dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.38</version>
	</dependency>
</dependencies>

案例数据

1,tony,18
2,xiaoqiang,20
3,xiaoming,15
4,laowang,45

创建mysql表

在node03登录mysql,创建一个表,Person

mysql> create database demo1;
mysql> use demo1
mysql> create table person(id int,name varchar(15),age int);

通过foreach算子实现

大致步骤:加载数据--》处理数据后将数据封装到RDD中--》foreach遍历数据,创建mysql连接,写入数据

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Data2MysqlForeach {
  def main(args: Array[String]): Unit = {
    val sparkkconf=new SparkConf().setAppName("ForeachMysql").setMaster("local[2]")
    val sc=new SparkContext(sparkkconf)
    val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\person.txt")
    val data2:RDD[Array[String]]=data.map(x=>x.split(","))

    data2.foreach(t=> {
      var conne: Connection = null
      try {
        //创建连接
        conne = DriverManager.getConnection("jdbc:mysql://node03:3306/demo1", "root", "123456")
        //定义sql语句,?是占位符
        val sql1 = "insert into person(id,name,age) values(?,?,?)"
        //获取prepareStatement对象,这个对象可以对sql语句进行预编译
        val ps = conne.prepareStatement(sql1)
        //给sql语句的问号?赋值,1代表第一个问号,2代表第二个问号...
        ps.setString(1, t(0))
        ps.setString(2, t(1))
        ps.setString(3, t(2))
        //执行sql语句
        ps.execute()
      }catch {
        case ex:Exception =>println(ex.getMessage)
      }finally {
        if(conne!=null){conne.close()}
      }
    })
    sc.stop()
  }
}

查看mysql的person表,如下,已经写入成功:

mysql> select * from person;
+------+-----------+------+
| id   | name      | age  |
+------+-----------+------+
|    4 | laowang   |   45 |
|    1 | tony      |   18 |
|    2 | xiaoqiang |   20 |
|    3 | xiaoming  |   15 |
+------+-----------+------+

说明:

  1. 通过foreach算子来实现的话,观察代码,会发现,foreach每遍历一条数据,就会创建一个mysql连接,如果存在大量数据的话,无疑是很耗时的。
  2. 从person表可看到,插入的数据的顺序并不是跟源数据一致的,这是因为受到了多个分区并行执行的影响。

通过foreachPartition算子实现

通过foreachPartition来实现与foreach来实现的源代码差不多,只需要修改几个地方,代码如下:

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Data2MysqlForeachPartition {
  def main(args: Array[String]): Unit = {
    val sparkkconf=new SparkConf().setAppName("ForeachMysql").setMaster("local[2]")
    val sc=new SparkContext(sparkkconf)
    val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\person.txt")
    val data2:RDD[Array[String]]=data.map(x=>x.split(","))

    data2.foreachPartition(iter=> {
      var conne: Connection = null
      try {
        //创建连接
        conne = DriverManager.getConnection("jdbc:mysql://node03:3306/demo1", "root", "123456")
        //定义sql语句,?是占位符
        val sql1 = "insert into person(id,name,age) values(?,?,?)"
        //获取prepareStatement对象,这个对象可以对sql语句进行预编译
        val ps = conne.prepareStatement(sql1)
        //给sql语句的问号?赋值,1代表第一个问号,2代表第二个问号...
        iter.foreach(t=>{
          ps.setString(1, t(0))
          ps.setString(2, t(1))
          ps.setString(3, t(2))

          ps.addBatch()
        })
        //执行sql语句
        ps.executeBatch()
      }catch {
        case ex:Exception =>println(ex.getMessage)
      }finally {
        if(conne!=null){conne.close()}
      }
    })
    sc.stop()
  }
}

再次查看person表:

mysql> select * from person;
+------+-----------+------+
| id   | name      | age  |
+------+-----------+------+
|    4 | laowang   |   45 |
|    1 | tony      |   18 |
|    2 | xiaoqiang |   20 |
|    3 | xiaoming  |   15 |
|    1 | tony      |   18 |
|    4 | laowang   |   45 |
|    2 | xiaoqiang |   20 |
|    3 | xiaoming  |   15 |
+------+-----------+------+

小结

  1. foreach算子实现获取得到一条一条的数据之后,然后进行获取对应的数据库连接,实现把数据插入到mysql表中,这里rdd中有N条数据,这里就需要与mysql数据库创建N次连接,它是比较浪费资源

  2. foreachPartition算子实现以分区为单位与mysql数据库来创建数据库连接,可以大大减少与mysql数据创建的连接数,有助于程序的性能提升。所以后期推荐大家使用foreachPartition算子

案例4:读取文件数据写入到hbase表中

案例数据

数据是资料中的users.dat文件,数据的大致格式如下,以::为分隔符,一共5个字段,分别是id,gender,age,position,code

1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810

添加pom依赖

在之前案例的pom的基础上,添加以下依赖:

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.1</version>
        </dependency>

创建hbase表

确保hbase、hadoop、zookeeper都正常开启,进入hbase shell,创建表person

start-hbase.sh

hbase shell

hbase(main):001:0> create \'person\',\'f1\',\'f2\'

代码开发

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.spark.{SparkConf, SparkContext}

object Data2Hbase {
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setAppName("hbase").setMaster("local[2]")
    val sc=new SparkContext(sparkConf)
    val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\users.dat")
    val data2=data.map(x=>x.split("::"))
    data2.foreachPartition(iter=>{
      var conne:Connection=null
      try{
        val conf=HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181")
        conne=ConnectionFactory.createConnection(conf)
        val tablePerson=conne.getTable(TableName.valueOf("person"))
        iter.foreach(x=>{
          val put=new Put(x(0).getBytes())
          put.addColumn("f1".getBytes(),"gender".getBytes(),x(1).getBytes())
          put.addColumn("f1".getBytes(),"age".getBytes(),x(2).getBytes())
          put.addColumn("f1".getBytes(),"position".getBytes(),x(3).getBytes())
          put.addColumn("f1".getBytes(),"code".getBytes(),x(4).getBytes())
          tablePerson.put(put)
        })
      }catch {
        case ex:Exception =>println(ex.getMessage)
      }finally {if (conne!=null){conne.close()}}
    })
  }
}

查看hbase中的person表,部分数据如下:

scan \'person\'                                                                                  
999                 column=f1:age, timestamp=1586981613489, value=25                   
999                 column=f1:code, timestamp=1586981613489, value=62558               
999                 column=f1:gender, timestamp=1586981613489, value=M                 
999                 column=f1:position, timestamp=1586981613489, value=15   

案例5:实现ip地址查询

需求分析

在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。

1579070050537

要想实现上面热点图效果,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。

示意图:基站下放给用户可以上网的ip地址,通过这个ip地址就可以定位用户的坐标(经纬度)。

image-20200416043037480

案例数据

1、日志信息数据: 20090121000132.394251.http.format

各字段分别表示:时间戳|ip地址|.......,只需要留意前2个字段

1579070153331

2、城市ip段信息数据: ip.txt,类似于码表数据

开始数字和结束数字分别是开始ip和结束ip经过算法计算得到的值。

1579070232110

开发思路

1、 加载城市ip段信息,获取ip起始数字和结束数字,经度,维度

2、 加载日志数据,获取ip信息,然后使用相同的算法将ip转换为数字,和ip段比较

3、 比较的时候采用二分法查找,找到对应的经度和维度

4、 然后对经度和维度做单词计数

广播变量

在本次的ip案例中,要将日志数据中的每个ip都拿去跟城市ip信息数据进行匹配,为每个日志数据中的ip匹配对应的经纬度,而如果每个task都加载一份城市ip信息数据到内存中的话,无疑是非常消耗内存的,因此需要将城市ip信息数据封装在广播变量里,作为共享数据

image-20200416123110870

代码实现

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CityIp {
  def ip2Long(ip:String):Long={
    val ipSpl:Array[String]=ip.split("\\.")
    var ipLong: Long = 0L
    for(i<-ipSpl){
      ipLong=i.toLong | ipLong<<8L
    }
    ipLong
  }
  def binarySearch(ipLong: Long, cityIp: Array[(String, String, String, String)]): Long ={
    //定义码表数组的起始下标:
    var startIndex=0
    //定义码表数组的结束下标:
    var endIndex=cityIp.length-1

    while(startIndex<=endIndex){
      val middleIndex=(startIndex+endIndex)/2
      //如果正好满足中间的元组的IP数值
      if(ipLong >= cityIp(middleIndex)._1.toLong && ipLong<=cityIp(middleIndex)._2.toLong){
        return middleIndex
      }
      if(ipLong > cityIp(middleIndex)._1.toLong){
        startIndex=middleIndex+1
      }
      if(ipLong<cityIp(middleIndex)._2.toLong){
        endIndex=middleIndex
      }
    }
    -1 //-1表示下标没有找到
  }
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("Ip").setMaster("local")
    val sc=new SparkContext(sparkconf)
    //加载ip码表数据
    val ipData=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\ip.txt")
    //处理ip码表数据
    val ipData2=ipData.map(x=>x.split("\\|")).map(x=>(x(2),x(3),x(x.length-2),x(x.length-2)))
    //创建广播变量
    val bdIP=sc.broadcast(ipData2.collect())
    //加载运营商日志数据
    val logData=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\20090121000132.394251.http.format")
    //处理运营商日志数据
    val logIps=logData.map(x=>x.split("\\|")(1))
    //遍历日志数据中的每个ip,将ip转为Long类型数值
    val andOneRDD:RDD[((String, String), Int)] =logIps.mapPartitions(iter=>{
      //获取广播变量的数据
      val cityIp:Array[(String,String,String,String)]=bdIP.value
      //遍历日志的ip,将ip转为数值
      iter.map(x=>{
        val ipLong=ip2Long(x)
        //获取ipLong在ip码表对应的索引数值(获取ipLong处在城市ipx信息表的第几行的ip数值区间)
        val index:Int=binarySearch(ipLong,cityIp).toInt
        //获取下标对应的经纬度等信息
        val resultJW: (String, String, String, String)=cityIp(index)
        //封装数据,作为返回值
        ((resultJW._3,resultJW._4),1)
      })

    })
    //相同的经纬度出现累加1
    val finalResult=andOneRDD.reduceByKey(_+_)

    //打印数据:
    finalResult.foreach(println)
  }
}

运行结果为:

((106.51107,106.51107),91)
((108.948024,108.948024),1824)
((114.502461,114.502461),383)
((106.27633,106.27633),36)
((102.712251,102.712251),126)
((107.08166,107.08166),29)
((116.405285,116.405285),1535)
((107.7601,107.7601),85)
((107.39007,107.39007),47)
((106.57434,106.57434),177)
((106.56347,106.56347),3)
((106.504962,106.504962),400)

小结

该案例比较贴近实际的真实需求,含金量是比较高,这里使用了广播变量知识点、二分查询、ip地址转成Long类型数字,大家多多练习!掌握spark中的RDD编程。