StreamingListener记录(spark-2.2.0)

时间:2022-03-22 00:51:04

记录spark streaming 中监听器StreamingListener的相关信息

概述

    StreamingListener 是针对spark streaming的各个阶段的事件监听机制
    在用法上跟SparkListener很类似,但是有些细节区别
代码记录
//需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可
//本身既有注释说明
trait StreamingListener {

  /** Called when the streaming has been started */
  /** streaming 启动的事件 */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }

  /** Called when a receiver has been started */
  /** 接收启动事件 */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  /** 每个批次提交的事件 */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  /** 每个批次启动的事件 */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  /** 每个批次完成的事件  */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
示例代码
package z.cloud.test.listener

class MyTestStreamingListener(ssc : StreamingContext) extends StreamingListener with Logging{

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    val execTime = batchInfo.processingDelay.getOrElse(0L)
    val schedulingTime = batchInfo.schedulingDelay.getOrElse(0L)
    logInfo(s"执行时间: $execTime 调度延时 : $schedulingTime")
  }

}
示例代码应用
//streamingListener不需要在配置中设置,可以直接添加到streamingContext中
object My{
    def main(args : Array[String]) : Unit = {
        val sparkConf = new SparkConf()
        val ssc = new StreamingContext(sparkConf,Seconds(20))
        ssc.addStreamingListener(new MyTestStreamingListener(ssc))

        ....
    }
}