Spark之UDF

时间:2023-03-08 23:06:32
Spark之UDF
 package big.data.analyse.udfudaf

 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SparkSession}

 /**
   * Created by zhen on 2018/11/25.
   */
 object SparkUdfUdaf {
   def isAdult(age : Int) ={
     if(age > 18){
       true
     }else{
       false
     }
   }
   def main(args: Array[String]) {
     val spark = SparkSession
       .builder()
       .appName("UdfUdaf")
       .master("local[2]")
       .getOrCreate()
     val userData = Array(
       "2015,11,www.baidu.com",
       "2016,14,www.google.com",
       "2017,13,www.apache.com",
       "2015,21,www.spark.com",
       "2016,32,www.hadoop.com",
       "2017,18,www.solr.com",
       "2017,14,www.hive.com"
     )
     val sc = spark.sparkContext
     val sqlContext = spark.sqlContext
     val userDataRDD = sc.parallelize(userData) // 转化为RDD
     val userDataType = userDataRDD.map(line => {
         val Array(age, id, url) = line.split(",")
         Row(
           age, id.toInt, url
         )
       })
     val structTypes = StructType(Array(
       StructField("age", StringType, true),
       StructField("id", IntegerType, true),
       StructField("url", StringType, true)
     ))
     // RDD转化为DataFrame
     val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes)
     // 注冊临时表
     userDataFrame.createOrReplaceTempView("udf")
     // 注册udf(方式一)
     spark.udf.register("getLength", (str : String) => str.length)
     // 注册udf(方式二)
     spark.udf.register("isAdult", isAdult _)
     //执行sql
     val sql = "select * from udf where getLength(udf.url)=13 and isAdult(udf.id)"
     val result = sqlContext.sql(sql)
     result.foreach(println(_))
   }
 }

结果:

Spark之UDF