在Spark Streaming上使用Spark Mllib的思路实现

时间:2023-01-26 20:46:36

在Spark Streaming中,数据抽象是DStream(离散数据流)。底层是靠封装RDD实现,而Spark Mllib是早期的机器学习库,主要也是基于RDD抽象数据集实现的算法。因此在Spark Streaming上想要使用Spark Mllib首先就要获取到DStream对应的RDD,而DStream中可以获取到RDD的方法有如下:

 

def foreachRDD(foreachFunc: (RDD[T], Time) => Unit,displayInnerRDDOps: Boolean): Unit

def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

def transformWith[U: ClassTag, V: ClassTag](
		other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]): DStream[V] 
具体方法实现参考源码,观察方法签名我们可以得知foreachRDD是不可以的,因为foreachRDD返回值类型Unit,获取不到结果(但是通过在foreachRDD方法中实现预测结果输出外部存储的话,也是可以实现的)。所以方便实现的就是transform算子,需要一个RDD=>RDD的函数,我们就可以将这个函数定义为预测函数,然后传入(识别模型我们可以通过在加载方式加载离线模型)

具体实现伪代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Daxin on 2017/8/9.
  * 本程序主要想要说明如何在Spark Streaming上使用Spark Mllib
  */
object SparkStreamingMl {

  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("socketStream")
    //基于Reciver模式,所以线程数目需要大于1,否则只能接受数据无法处理数据
    conf.setMaster("local[*]") //如果设置conf.setMaster("local[1]")的话,将会没有线程负责计算
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val ssc = new StreamingContext(sc, Seconds(2))

    ssc.checkpoint("")

    //TODO 最终创建一个SocketInputDStream返回
    val line = ssc.socketTextStream("node", 9999)


    /**
      * 模拟识别的预测逻辑
      *
      * @param rdd
      * @tparam T
      * @return
      */
    def predict[T](rdd: RDD[T]): RDD[T] = {
      //省略预测的逻辑
      rdd
    }
    /**
      * 直接定义函数
      */
    val predictFunc = (rdd: RDD[String]) => {
      rdd
    }

    val func = predict[String] _ //方法转函数,此处主要想复习一下Scala知识点
    line.transform(func(_)).print()

    ssc.start()
    ssc.awaitTermination()
  }


}


由于Spark Mllib是基于RDD的API,目前Spark已经不再为Mllib添加新特性了,Mllib目前处于维护阶段,以后更多支持的是基于DataFrame的ml API。DataFrame的数据结构为:

/**
 * Allows the execution of relational queries, including those expressed in SQL using Spark.
 *
 *  @groupname dataType Data types
 *  @groupdesc Spark SQL data types.
 *  @groupprio dataType -3
 *  @groupname field Field
 *  @groupprio field -2
 *  @groupname row Row
 *  @groupprio row -1
 */
package object sql {

  /**
   * Converts a logical plan into zero or more SparkPlans.  This API is exposed for experimenting
   * with the query planner and is not designed to be stable across spark releases.  Developers
   * writing libraries should instead consider using the stable APIs provided in
   * [[org.apache.spark.sql.sources]]
   */
  @DeveloperApi
  @InterfaceStability.Unstable
  type Strategy = SparkStrategy

  type DataFrame = Dataset[Row]
}