package big.data.analyse.udfudaf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} /** * Created by zhen on 2018/11/25. */ object SparkUdfUdaf { def isAdult(age : Int) ={ if(age > 18){ true }else{ false } } def main(args: Array[String]) { val spark = SparkSession .builder() .appName("UdfUdaf") .master("local[2]") .getOrCreate() val userData = Array( "2015,11,www.baidu.com", "2016,14,www.google.com", "2017,13,www.apache.com", "2015,21,www.spark.com", "2016,32,www.hadoop.com", "2017,18,www.solr.com", "2017,14,www.hive.com" ) val sc = spark.sparkContext val sqlContext = spark.sqlContext val userDataRDD = sc.parallelize(userData) // 转化为RDD val userDataType = userDataRDD.map(line => { val Array(age, id, url) = line.split(",") Row( age, id.toInt, url ) }) val structTypes = StructType(Array( StructField("age", StringType, true), StructField("id", IntegerType, true), StructField("url", StringType, true) )) // RDD转化为DataFrame val userDataFrame = sqlContext.createDataFrame(userDataType,structTypes) // 注冊临时表 userDataFrame.createOrReplaceTempView("udf") // 注册udf(方式一) spark.udf.register("getLength", (str : String) => str.length) // 注册udf(方式二) spark.udf.register("isAdult", isAdult _) //执行sql val sql = "select * from udf where getLength(udf.url)=13 and isAdult(udf.id)" val result = sqlContext.sql(sql) result.foreach(println(_)) } }
结果: