spark streaming从kafka获取数据,计算处理后存储到redis

时间:2023-01-22 20:47:17


摘要

本文主要实现一个简单sparkstreaming小栗子,整体流程是从kafka实时读取数据,计算pv,uv,以及sum(money)操作,最后将计算结果存入redis中,用sql表述大概就是

select time,page,count(*),count(distinct user) uv,sum(money) from test group by page,time

样例数据格式:

user,page,money,time

[html]  view plain  copy
  1. smith,iphone4.html,578.02,1500618981283  
  2. andrew,mac.html,277.62,1500618981285  
  3. smith,note.html,388.56,1500618981285  

将数据push到kafka

启动kafka


造数据

[java]  view plain  copy
  1. package com.fan.spark.stream  
  2.    
  3. import java.text.DecimalFormat  
  4. import java.util.Properties  
  5.    
  6. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}  
  7.    
  8. import scala.util.Random  
  9. /** 
  10.   * Created by http://www.fanlegefan.com on 17-7-21. 
  11.   */  
  12. object ProduceMessage {  
  13.    
  14.   def main(args: Array[String]): Unit = {  
  15.    
  16.     val props = newProperties()  
  17.     props.put("bootstrap.servers","localhost:9092")  
  18.     props.put("acks","all")  
  19.     props.put("retries","0")  
  20.     props.put("batch.size","16384")  
  21.     props.put("linger.ms","1")  
  22.     props.put("buffer.memory","33554432")  
  23.     props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")  
  24.     props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")  
  25.    
  26.     val producer = newKafkaProducer[String, String](props)  
  27.    
  28.     val users = Array("jack","leo","andy","lucy","jim","smith","iverson","andrew")  
  29.     val pages = Array("iphone4.html","huawei.html","mi.html","mac.html","note.html","book.html","fanlegefan.com")  
  30.     val df = newDecimalFormat("#.00")  
  31.     val random = newRandom()  
  32.     val num = 10  
  33.     for(i<- 0 to num ){  
  34.       val message = users(random.nextInt(users.length))+","+pages(random.nextInt(pages.length))+  
  35.       ","+df.format(random.nextDouble()*1000)+","+System.currentTimeMillis()  
  36.       producer.send(newProducerRecord[String, String]("test", Integer.toString(i),message))  
  37.       println(message)  
  38.     }  
  39.     producer.close()  
  40.   }  
  41. }  


控制台消费如下

[java]  view plain  copy
  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  
  2. andrew,book.html,309.58,1500620213384  
  3. jack,book.html,954.01,1500620213456  
  4. iverson,book.html,823.07,1500620213456  
  5. iverson,iphone4.html,486.76,1500620213456  
  6. lucy,book.html,14.00,1500620213457  
  7. iverson,note.html,206.30,1500620213457  
  8. jack,book.html,25.30,1500620213457  
  9. jim,iphone4.html,513.82,1500620213457  
  10. lucy,mac.html,677.29,1500620213457  
  11. smith,mi.html,571.30,1500620213457  
  12. lucy,iphone4.html,113.83,1500620213457  

计算pv,uv以及累计金额

因为数据要存入redis中,获取redis客户端代码如下

[java]  view plain  copy
  1. package com.fan.spark.stream  
  2.    
  3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig  
  4. import redis.clients.jedis.JedisPool  
  5.    
  6. /** 
  7.   * Created by http://www.fanlegefan.com on 17-7-21. 
  8.   */  
  9. object RedisClient {  
  10.   val redisHost = "127.0.0.1"  
  11.   val redisPort = 6379  
  12.   val redisTimeout = 30000  
  13.    
  14.   lazy val pool = newJedisPool(newGenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)  
  15.   lazy val hook = newThread {  
  16.     override def run = {  
  17.       println("Execute hook thread: " + this)  
  18.       pool.destroy()  
  19.     }  
  20.   }  
  21.    
  22.   sys.addShutdownHook(hook.run)  
  23. }  

sparkstreaming 是按batch处理数据,例如设置batchDuration=10,则每批次处理10秒中内接收到的数据,计算pv的时候,直接count累加就可以;但是计算uv的时候,这10秒内出现的用户,在之前的batch中也可能出现,但是spark是按batch处理数据,没办法知道之前用户是否出现过,如果只是简单的累计的话,一天下来uv的数据会比真实的uv大很多,所以要解决这个问题就要引入HyperLogLog,还好redis已经提供了这个功能,具体使用情况直接看栗子

[java]  view plain  copy
  1. redis 127.0.0.1:6379> PFADD mykey a b c d e f g h i j  
  2. (integer) 1  
  3. redis 127.0.0.1:6379> PFCOUNT mykey  
  4. (integer) 10  

a b c d e f g h i j这些可以理解为user,每来一个user,我们就执行下pfadd user操作;使用pfcount key就可以直接获得去重后的uv,但是要注意的是这种算法是有误差的,查阅了相关文档误差大约在0.8%左右,用于计算uv,这种误差还是可以接受的,具体误差大家可以测试下,这里我就不测了


实时计算代码如下

[java]  view plain  copy
  1. package com.fan.spark.stream  
  2.    
  3. import java.text.SimpleDateFormat  
  4. import java.util.Date  
  5.    
  6. import kafka.serializer.StringDecoder  
  7. import org.apache.spark.streaming.kafka.KafkaUtils  
  8. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  9. import org.apache.spark.{SparkConf, SparkContext}  
  10.    
  11. /** 
  12.   * Created by http://www.fanlegefan.com on 17-7-21. 
  13.   */  
  14. object UserActionStreaming {  
  15.    
  16.   def main(args: Array[String]): Unit = {  
  17.     val df = newSimpleDateFormat("yyyyMMdd")  
  18.     val group = "test"  
  19.     val topics = "test"  
  20.    
  21.     val sparkConf = newSparkConf().setAppName("pvuv").setMaster("local[3]")  
  22.    
  23.     val sc = newSparkContext(sparkConf)  
  24.     val ssc = newStreamingContext(sc, Seconds(10))  
  25.     ssc.checkpoint("/home/work/IdeaProjects/sparklearn/checkpoint")  
  26.    
  27.     val topicSets = topics.split(",").toSet  
  28.     val kafkaParams = Map[String, String](  
  29.       "metadata.broker.list"-> "localhost:9092",  
  30.       "group.id"-> group  
  31.     )  
  32.     val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,  
  33.       kafkaParams, topicSets)  
  34.     stream.foreachRDD(rdd=>rdd.foreachPartition(partition=>{  
  35.       val jedis = RedisClient.pool.getResource  
  36.       partition.foreach(tuple=>{  
  37.         val line = tuple._2  
  38.         val arr = line.split(",")  
  39.         val user = arr(0)  
  40.         val page = arr(1)  
  41.         val money = arr(2)  
  42.         val day = df.format(newDate(arr(3).toLong))  
  43.         //uv  
  44.         jedis.pfadd(day  + "_"+ page , user)  
  45.         //pv  
  46.         jedis.hincrBy(day+"_pv", page, 1)  
  47.         //sum  
  48.         jedis.hincrByFloat(day+"_sum", page, money.toDouble)  
  49.       })  
  50.     }))  
  51.     ssc.start()  
  52.     ssc.awaitTermination()  
  53.   }  
  54. }  

在redis中查看结果

[java]  view plain  copy
  1. 127.0.0.1:6379> keys *  
  2. 1)"20170721_note.html"  
  3. 2)"20170721_book.html"  
  4. 3)"20170721_fanlegefan.com"  
  5. 4)"20170721_mac.html"  
  6. 5)"20170721_pv"  
  7. 6)"20170721_mi.html"  
  8. 7)"20170721_iphone4.html"  
  9. 8)"20170721_sum"  
  10. 9)"20170721_huawei.html"  

查看pv

[java]  view plain  copy
  1. 127.0.0.1:6379> HGETALL 20170721_pv  
  2.  1)"mi.html"  
  3.  2)"112"  
  4.  3)"note.html"  
  5.  4)"107"  
  6.  5)"fanlegefan.com"  
  7.  6)"124"  
  8.  7)"huawei.html"  
  9.  8)"122"  
  10.  9)"iphone4.html"  
  11. 10)"92"  
  12. 11)"mac.html"  
  13. 12)"103"  
  14. 13)"book.html"  
  15. 14)"135"  

查看sum

[java]  view plain  copy
  1. 127.0.0.1:6379> HGETALL 20170721_sum  
  2.  1)"mi.html"  
  3.  2)"56949.65999999999998948"  
  4.  3)"note.html"  
  5.  4)"56803.50999999999999801"  
  6.  5)"fanlegefan.com"  
  7.  6)"59622.50999999999999801"  
  8.  7)"huawei.html"  
  9.  8)"64456.50000000000000711"  
  10.  9)"iphone4.html"  
  11. 10)"48643.07000000000001094"  
  12. 11)"mac.html"  
  13. 12)"51693.17999999999998906"  
  14. 13)"book.html"  
  15. 14)"67724.17999999999999261"  

查看UV,测试数据只有8个user,所以uv都是8

[java]  view plain  copy
  1. 127.0.0.1:6379> PFCOUNT 20170721_huawei.html  
  2. (integer) 8  
  3. 127.0.0.1:6379> PFCOUNT 20170721_fanlegefan.com  
  4. (integer) 8  

现在数据已经在redis中,可以写个定时任务将数据push到mysql中,前端就可以展示了,实时计算大概是这么个思路


[java]  view plain  copy
  1. 基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。  
  2. 我们的应用场景是分析用户使用手机App的行为,描述如下所示:  
  3.   
  4. 手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列  
  5. 后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持  
  6. 经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析  
  7. Spark Streaming介绍  
  8.   
  9. Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展(Scalable)、高吞吐(High-throughput)等。  
  10. 在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所示(来自官网):  
  11. streaming-dstream  
  12. 我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久化,Action操作包括reduceByKey、count、foreach、collect等。关于Transformations和Actions更详细内容,可以查看官网文档。  
  13. 同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行Spark的Action操作,使得该操作之前lazy的Transformation操作序列真正地执行。  
  14.   
  15. Kafka+Spark Streaming+Redis编程实践  
  16.   
  17. 下面,我们根据上面提到的应用场景,来编程实现这个实时计算应用。首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下:  
  18.   
  19. 1  
  20. {"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}  
  21. 一个事件包含4个字段:  
  22.   
  23. uid:用户编号  
  24. event_time:事件发生时间戳  
  25. os_type:手机App操作系统类型  
  26. click_count:点击次数  
  27. 下面是我们实现的代码,如下所示:  


 
[java]  view plain  copy
  1. </pre><pre name="code" class="java"></pre><pre name="code" class="java">package org.shirdrn.spark.streaming.utils  
  2.   
  3. import java.util.Properties  
  4. import scala.util.Properties  
  5. import org.codehaus.jettison.json.JSONObject  
  6. import kafka.javaapi.producer.Producer  
  7. import kafka.producer.KeyedMessage  
  8. import kafka.producer.KeyedMessage  
  9. import kafka.producer.ProducerConfig  
  10. import scala.util.Random  
  11.   
  12. object KafkaEventProducer {  
  13.    
  14.   private val users = Array(  
  15.       "4A4D769EB9679C054DE81B973ED5D768""8dfeb5aaafc027d89349ac9a20b3930f",  
  16.       "011BBF43B89BFBF266C865DF0397AA71""f2a8474bf7bd94f0aabbd4cdd2c06dcf",  
  17.       "068b746ed4620d25e26055a9f804385f""97edfc08311c70143401745a03a50706",  
  18.       "d7f141563005d1b5d0d3dd30138f3f62""c8ee90aade1671a21336c721512b817a",  
  19.       "6b67c8c700427dee7552f81f3228c927""a95f22eabc4fd4b580c011a3161a9d9d")  
  20.        
  21.   private val random = new Random()  
  22.        
  23.   private var pointer = -1  
  24.    
  25.   def getUserID() : String = {  
  26.        pointer = pointer + 1  
  27.     if(pointer >= users.length) {  
  28.       pointer = 0  
  29.       users(pointer)  
  30.     } else {  
  31.       users(pointer)  
  32.     }  
  33.   }  
  34.    
  35.   def click() : Double = {  
  36.     random.nextInt(10)  
  37.   }  
  38.    
  39.   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2  
  40.   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list  
  41.   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events  
  42.   // bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning  
  43.   def main(args: Array[String]): Unit = {  
  44.     val topic = "user_events"  
  45.     val brokers = "10.10.4.126:9092,10.10.4.127:9092"  
  46.     val props = new Properties()  
  47.     props.put("metadata.broker.list", brokers)  
  48.     props.put("serializer.class""kafka.serializer.StringEncoder")  
  49.      
  50.     val kafkaConfig = new ProducerConfig(props)  
  51.     val producer = new Producer[String, String](kafkaConfig)  
  52.      
  53.     while(true) {  
  54.       // prepare event data  
  55.       val event = new JSONObject()  
  56.       event  
  57.         .put("uid", getUserID)  
  58.         .put("event_time", System.currentTimeMillis.toString)  
  59.         .put("os_type""Android")  
  60.         .put("click_count", click)  
  61.        
  62.       // produce event message  
  63.       producer.send(new KeyedMessage[String, String](topic, event.toString))  
  64.       println("Message sent: " + event)  
  65.        
  66.       Thread.sleep(200)  
  67.     }  
  68.   }    
  69. }  

通过控制上面程序最后一行的时间间隔来控制模拟写入速度。下面我们来讨论实现实时统计每个用户的点击次数,它是按照用户分组进行累加次数,逻辑比较简单,关键是在实现过程中要注意一些问题,如对象序列化等。先看实现代码,稍后我们再详细讨论,代码实现如下所示:

[java]  view plain  copy
  1. object UserClickCountAnalytics {  
  2.   
  3.   def main(args: Array[String]): Unit = {  
  4.     var masterUrl = "local[1]"  
  5.     if (args.length > 0) {  
  6.       masterUrl = args(0)  
  7.     }  
  8.   
  9.     // Create a StreamingContext with the given master URL  
  10.     val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")  
  11.     val ssc = new StreamingContext(conf, Seconds(5))  
  12.   
  13.     // Kafka configurations  
  14.     val topics = Set("user_events")  
  15.     val brokers = "10.10.4.126:9092,10.10.4.127:9092"  
  16.     val kafkaParams = Map[String, String](  
  17.       "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")  
  18.   
  19.     val dbIndex = 1  
  20.     val clickHashKey = "app::users::click"  
  21.   
  22.     // Create a direct stream  
  23.     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)  
  24.   
  25.     val events = kafkaStream.flatMap(line => {  
  26.       val data = JSONObject.fromObject(line._2)  
  27.       Some(data)  
  28.     })  
  29.   
  30.     // Compute user click times  
  31.     val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)  
  32.     userClicks.foreachRDD(rdd => {  
  33.       rdd.foreachPartition(partitionOfRecords => {  
  34.         partitionOfRecords.foreach(pair => {  
  35.           val uid = pair._1  
  36.           val clickCount = pair._2  
  37.           val jedis = RedisClient.pool.getResource  
  38.           jedis.select(dbIndex)  
  39.           jedis.hincrBy(clickHashKey, uid, clickCount)  
  40.           RedisClient.pool.returnResource(jedis)  
  41.         })  
  42.       })  
  43.     })  
  44.   
  45.     ssc.start()  
  46.     ssc.awaitTermination()  
  47.   
  48.   }  
  49. }  

上面代码使用了Jedis客户端来操作Redis,将分组计数结果数据累加写入Redis存储,如果其他系统需要实时获取该数据,直接从Redis实时读取即可。RedisClient实现代码如下所示:

 
[java]  view plain  copy
  1. object RedisClient extends Serializable {  
  2.   val redisHost = "10.10.4.130"  
  3.   val redisPort = 6379  
  4.   val redisTimeout = 30000  
  5.   lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)  
  6.   
  7.   lazy val hook = new Thread {  
  8.     override def run = {  
  9.       println("Execute hook thread: " + this)  
  10.       pool.destroy()  
  11.     }  
  12.   }  
  13.   sys.addShutdownHook(hook.run)  
  14. }  



上面代码我们分别在local[K]和Spark Standalone集群模式下运行通过。
如果我们是在开发环境进行调试的时候,也就是使用local[K]部署模式,在本地启动K个Worker线程来计算,这K个Worker在同一个JVM实例里,上面的代码默认情况是,如果没有传参数则是local[K]模式,所以如果使用这种方式在创建Redis连接池或连接的时候,可能非常容易调试通过,但是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的时候,就会报错,主要是由于在处理Redis连接池或连接的时候出错了。我们可以看一下Spark架构,如图所示(来自官网):
spark streaming从kafka获取数据,计算处理后存储到redis
无论是在本地模式、Standalone模式,还是在Mesos或YARN模式下,整个Spark集群的结构都可以用上图抽象表示,只是各个组件的运行环境不同,导致组件可能是分布式的,或本地的,或单个JVM实例的。如在本地模式,则上图表现为在同一节点上的单个进程之内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群之外的一个节点上提交Spark Application,其他的组件都运行在YARN集群管理的节点上。
在Spark集群环境部署Application后,在进行计算的时候会将作用于RDD数据集上的函数(Functions)发送到集群中Worker上的Executor上(在Spark Streaming中是作用于DStream的操作),那么这些函数操作所作用的对象(Elements)必须是可序列化的,通过Scala也可以使用lazy引用来解决,否则这些对象(Elements)在跨节点序列化传输后,无法正确地执行反序列化重构成实际可用的对象。上面代码我们使用lazy引用(Lazy Reference)来实现的,代码如下所示:

 
[java]  view plain  copy
  1. // lazy pool reference  
  2. lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)  
  3. ...  
  4. partitionOfRecords.foreach(pair => {  
  5.   val uid = pair._1  
  6.   val clickCount = pair._2  
  7.   val jedis = RedisClient.pool.getResource  
  8.   jedis.select(dbIndex)  
  9.   jedis.hincrBy(clickHashKey, uid, clickCount)  
  10.   RedisClient.pool.returnResource(jedis)  
  11. })  



另一种方式,我们将代码修改为,把对Redis连接的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理,如下所示:

 
[java]  view plain  copy
  1. package org.shirdrn.spark.streaming  
  2.   
  3. import org.apache.commons.pool2.impl.GenericObjectPoolConfig  
  4. import org.apache.spark.SparkConf  
  5. import org.apache.spark.streaming.Seconds  
  6. import org.apache.spark.streaming.StreamingContext  
  7. import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions  
  8. import org.apache.spark.streaming.kafka.KafkaUtils  
  9.   
  10. import kafka.serializer.StringDecoder  
  11. import net.sf.json.JSONObject  
  12. import redis.clients.jedis.JedisPool  
  13.   
  14. object UserClickCountAnalytics {  
  15.   
  16.   def main(args: Array[String]): Unit = {  
  17.     var masterUrl = "local[1]"  
  18.     if (args.length > 0) {  
  19.       masterUrl = args(0)  
  20.     }  
  21.   
  22.     // Create a StreamingContext with the given master URL  
  23.     val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")  
  24.     val ssc = new StreamingContext(conf, Seconds(5))  
  25.   
  26.     // Kafka configurations  
  27.     val topics = Set("user_events")  
  28.     val brokers = "10.10.4.126:9092,10.10.4.127:9092"  
  29.     val kafkaParams = Map[String, String](  
  30.       "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")  
  31.   
  32.     val dbIndex = 1  
  33.     val clickHashKey = "app::users::click"  
  34.   
  35.     // Create a direct stream  
  36.     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)  
  37.   
  38.     val events = kafkaStream.flatMap(line => {  
  39.       val data = JSONObject.fromObject(line._2)  
  40.       Some(data)  
  41.     })  
  42.   
  43.     // Compute user click times  
  44.     val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)  
  45.     userClicks.foreachRDD(rdd => {  
  46.       rdd.foreachPartition(partitionOfRecords => {  
  47.         partitionOfRecords.foreach(pair => {  
  48.            
  49.           /** 
  50.            * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool} 
  51.            */  
  52.           object InternalRedisClient extends Serializable {  
  53.              
  54.             @transient private var pool: JedisPool = null  
  55.              
  56.             def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,  
  57.                 maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {  
  58.               makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, truefalse10000)     
  59.             }  
  60.              
  61.             def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,  
  62.                 maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,  
  63.                 testOnReturn: Boolean, maxWaitMillis: Long): Unit = {  
  64.               if(pool == null) {  
  65.                    val poolConfig = new GenericObjectPoolConfig()  
  66.                    poolConfig.setMaxTotal(maxTotal)  
  67.                    poolConfig.setMaxIdle(maxIdle)  
  68.                    poolConfig.setMinIdle(minIdle)  
  69.                    poolConfig.setTestOnBorrow(testOnBorrow)  
  70.                    poolConfig.setTestOnReturn(testOnReturn)  
  71.                    poolConfig.setMaxWaitMillis(maxWaitMillis)  
  72.                    pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)  
  73.                     
  74.                    val hook = new Thread{  
  75.                         override def run = pool.destroy()  
  76.                    }  
  77.                    sys.addShutdownHook(hook.run)  
  78.               }  
  79.             }  
  80.              
  81.             def getPool: JedisPool = {  
  82.               assert(pool != null)  
  83.               pool  
  84.             }  
  85.           }  
  86.            
  87.           // Redis configurations  
  88.           val maxTotal = 10  
  89.           val maxIdle = 10  
  90.           val minIdle = 1  
  91.           val redisHost = "10.10.4.130"  
  92.           val redisPort = 6379  
  93.           val redisTimeout = 30000  
  94.           val dbIndex = 1  
  95.           InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)  
  96.            
  97.           val uid = pair._1  
  98.           val clickCount = pair._2  
  99.           val jedis =InternalRedisClient.getPool.getResource  
  100.           jedis.select(dbIndex)  
  101.           jedis.hincrBy(clickHashKey, uid, clickCount)  
  102.           InternalRedisClient.getPool.returnResource(jedis)  
  103.         })  
  104.       })  
  105.     })  
  106.   
  107.     ssc.start()  
  108.     ssc.awaitTermination()  
  109.   
  110.   }  
  111. }  



上面代码实现,得益于Scala语言的特性,可以在代码中任何位置进行class或object的定义,我们将用来管理Redis连接的代码放在了特定操作的内部,就避免了瞬态(Transient)对象跨节点序列化的问题。这样做还要求我们能够了解Spark内部是如何操作RDD数据集的,更多可以参考RDD或Spark相关文档。
在集群上,以Standalone模式运行,执行如下命令:

[java]  view plain  copy
  1. cd /usr/local/spark  
  2. ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077  


可以查看集群中各个Worker节点执行计算任务的状态,也可以非常方便地通过Web页面查看。
下面,看一下我们存储到Redis中的计算结果,如下所示:

 
[html]  view plain  copy
  1. 127.0.0.1:6379[1]> HGETALL app::users::click  
  2. 1) "4A4D769EB9679C054DE81B973ED5D768"  
  3. 2) "7037"  
  4. 3) "8dfeb5aaafc027d89349ac9a20b3930f"  
  5. 4) "6992"  
  6. 5) "011BBF43B89BFBF266C865DF0397AA71"  
  7. 6) "7021"  
  8. 7) "97edfc08311c70143401745a03a50706"  
  9. 8) "6874"  
  10. 9) "d7f141563005d1b5d0d3dd30138f3f62"  
  11. 10) "7057"  
  12. 11) "a95f22eabc4fd4b580c011a3161a9d9d"  
  13. 12) "7092"  
  14. 13) "6b67c8c700427dee7552f81f3228c927"  
  15. 14) "7266"  
  16. 15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"  
  17. 16) "7188"  
  18. 17) "c8ee90aade1671a21336c721512b817a"  
  19. 18) "6950"  
  20. 19) "068b746ed4620d25e26055a9f804385f"  



附录

这里,附上前面开发的应用所对应的依赖,以及打包Spark Streaming应用程序的Maven配置,以供参考。如果使用maven-shade-plugin插件,配置有问题的话,打包后在Spark集群上提交Application时候可能会报错Invalid signature file digest for Manifest main attributes。参考的Maven配置,如下所示:

 
[html]  view plain  copy
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.      <modelVersion>4.0.0</modelVersion>  
  4.      <groupId>org.shirdrn.spark</groupId>  
  5.      <artifactId>spark</artifactId>  
  6.      <version>0.0.1-SNAPSHOT</version>  
  7.   
  8.      <dependencies>  
  9.           <dependency>  
  10.                <groupId>org.apache.spark</groupId>  
  11.                <artifactId>spark-core_2.10</artifactId>  
  12.                <version>1.3.0</version>  
  13.           </dependency>  
  14.           <dependency>  
  15.                <groupId>org.apache.spark</groupId>  
  16.                <artifactId>spark-streaming_2.10</artifactId>  
  17.                <version>1.3.0</version>  
  18.           </dependency>  
  19.           <dependency>  
  20.                <groupId>net.sf.json-lib</groupId>  
  21.                <artifactId>json-lib</artifactId>  
  22.                <version>2.3</version>  
  23.           </dependency>  
  24.           <dependency>  
  25.                <groupId>org.apache.spark</groupId>  
  26.                <artifactId>spark-streaming-kafka_2.10</artifactId>  
  27.                <version>1.3.0</version>  
  28.           </dependency>  
  29.           <dependency>  
  30.                <groupId>redis.clients</groupId>  
  31.                <artifactId>jedis</artifactId>  
  32.                <version>2.5.2</version>  
  33.           </dependency>  
  34.           <dependency>  
  35.                <groupId>org.apache.commons</groupId>  
  36.                <artifactId>commons-pool2</artifactId>  
  37.                <version>2.2</version>  
  38.           </dependency>  
  39.      </dependencies>  
  40.   
  41.      <build>  
  42.           <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>  
  43.           <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>  
  44.           <resources>  
  45.                <resource>  
  46.                     <directory>${basedir}/src/main/resources</directory>  
  47.                </resource>  
  48.           </resources>  
  49.           <testResources>  
  50.                <testResource>  
  51.                     <directory>${basedir}/src/test/resources</directory>  
  52.                </testResource>  
  53.           </testResources>  
  54.           <plugins>  
  55.                <plugin>  
  56.                     <artifactId>maven-compiler-plugin</artifactId>  
  57.                     <version>3.1</version>  
  58.                     <configuration>  
  59.                          <source>1.6</source>  
  60.                          <target>1.6</target>  
  61.                     </configuration>  
  62.                </plugin>  
  63.                <plugin>  
  64.                     <groupId>org.apache.maven.plugins</groupId>  
  65.                     <artifactId>maven-shade-plugin</artifactId>  
  66.                     <version>2.2</version>  
  67.                     <configuration>  
  68.                          <createDependencyReducedPom>true</createDependencyReducedPom>  
  69.                     </configuration>  
  70.                     <executions>  
  71.                          <execution>  
  72.                               <phase>package</phase>  
  73.                               <goals>  
  74.                                    <goal>shade</goal>  
  75.                               </goals>  
  76.                               <configuration>  
  77.                                    <artifactSet>  
  78.                                         <includes>  
  79.                                              <include>*:*</include>  
  80.                                         </includes>  
  81.                                    </artifactSet>  
  82.                                    <filters>  
  83.                                         <filter>  
  84.                                              <artifact>*:*</artifact>  
  85.                                              <excludes>  
  86.                                                   <exclude>META-INF/*.SF</exclude>  
  87.                                                   <exclude>META-INF/*.DSA</exclude>  
  88.                                                   <exclude>META-INF/*.RSA</exclude>  
  89.                                              </excludes>  
  90.                                         </filter>  
  91.                                    </filters>  
  92.                                    <transformers>  
  93.                                         <transformer  
  94.                                              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />  
  95.                                         <transformer  
  96.                                              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">  
  97.                                              <resource>reference.conf</resource>  
  98.                                         </transformer>  
  99.                                         <transformer  
  100.                                              implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">  
  101.                                              <resource>log4j.properties</resource>  
  102.                                         </transformer>  
  103.                                    </transformers>  
  104.                               </configuration>  
  105.                          </execution>  
  106.                     </executions>  
  107.                </plugin>  
  108.           </plugins>  
  109.      </build>  
  110. </project>  



参考链接



http://shiyanjun.cn/archives/1097.html