自定义基于xmemcached协议消息队列的Spark Streaming 接收器

时间:2022-12-14 20:54:40

    虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的。对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可。而Receiver的实现只需要简单地实现两个方法:

  1、onStart():接收数据。

  2、onStop():停止接收数据。
    一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收。而onStop()方法负责确保这些接收数据的线程是停止的,在  Receiver 被关闭时调用了,可以做一些 close 工作。负责接收数据的线程可以通过isStopped()来判断是否要停止数据接收。
 
    对于接收到的数据,需要存储到spark 框架里,使用的是store()方法。Receiver抽象类提供了4种store()方法,分别可用于存储:
  1、

 单条小数据

 

 

2、数组形式的块数据

 

3、ByteBuffer 形式的块数据

 

4、iterator 形式的块数据

 

 

 

这4种的store()方法的实现都是直接将数据传递给 ReceiverSupervisor 来进行存储的。所以要自定义一个 Receiver,只要在 onStart() 里创建数据接收的线程,并在接收到数据时 store() 到 Spark Streamimg 框架就可以了。

 

 

下面代码是一个基于xmemcached协议消息队列的Spark Streaming 接收器:

    

import Fqueue.FqueueReceiver
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class FqueueStreamingReceiver(val address: String, val connectionPoolSize: Int, val timeOut: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
  private var receiver: Option[FqueueReceiver] = None

  def onStart() {
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop(): Unit = {
    receiver foreach { _.stop() }
  }

  private def receive(): Unit = {
    val fqueueReceiver = new FqueueReceiver(address, connectionPoolSize, timeOut)
    receiver = Some(fqueueReceiver)
    receiver foreach { _.connect() }

    try
    {
      var stop = false
      while (!isStopped() && !stop) {
        val data = fqueueReceiver.deQueue("track_BOdao2015*")
        data match {
          case Some(str) => store(str)
          case None => Thread.sleep(1000)//stop = true
        }
      }
      receiver foreach { _.stop() }
    } catch {
      case e: Exception =>
        println("get data from fqueue err! pleace sure the server is live")
        println(e.getMessage)
        println(e.getStackTraceString)
        receiver foreach { _.stop() }
    }
  }
}

    在自定义了spark streaming的Receiver后,可以在应用中使用:

def main(args: Array[String]) {
    new Thread("fqueue sender") {
      override def run() { sendData() }
    }.start()
    val config = new SparkConf().setAppName("testfqueue").setMaster("local[2]")
    val ssc = new StreamingContext(config, Seconds(5))
    val lines = ssc.receiverStream(new FqueueStreamingReceiver("localhost:18740", 4, 4000))
    lines.print()
    ssc.start()
    ssc.awaitTermination()
  }

 链接:

  http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  基于xmemcached的消息队列

  本文的Receiver实现源码

  喜欢的话github项目上送个星星(⊙o⊙)哦........您的star是我的动力!