UDAF(用户自定义聚合函数)求众数

时间:2022-12-08 10:39:24
除了逐行处理数据的udf,还有比较常见的就是聚合多行处理udaf,自定义聚合函数。类比rdd编程就是map和reduce算子的区别。
自定义UDAF,需要extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction,并实现接口中的8个方法。
udaf写起来比较麻烦,我下面列一个之前写的取众数聚合函数,在我们通常在聚合统计的时候可能会受某条脏数据的影响。
举个栗子:
对于一个app日志聚合的时候,有id与ip,原则上一个id有一个ip,但是在多条数据里有一条ip是错误的或者为空的,这时候group能会聚合成两条数据了就,如果使用max,min对ip也进行聚合,那也不太合理,这时候可以进行投票,去类似多数对结果,从而聚合后只有一个设备。
废话少说,上代码:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._ /**
* Description: 自定义聚合函数:众数(取列内频率最高的一条)
*/ class UDAFGetMode extends UserDefinedAggregateFunction {
override def inputSchema: StructType = {
StructType(StructField("inputStr", StringType, true) :: Nil)
} override def bufferSchema: StructType = {
StructType(StructField("bufferMap", MapType(keyType = StringType, valueType = IntegerType), true) :: Nil)
} override def dataType: DataType = StringType override def deterministic: Boolean = false //初始化map
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer() = scala.collection.immutable.Map[String, Int]()
} //如果包含这个key则value+1,否则写入key,value=1
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val key = input.getAs[String]()
val immap = buffer.getAs[Map[String, Int]]()
val bufferMap = scala.collection.mutable.Map[String, Int](immap.toSeq: _*)
val ret = if (bufferMap.contains(key)) {
// val new_value = bufferMap.get(key).get + 1
val new_value = bufferMap(key) +
bufferMap.put(key, new_value)
bufferMap
} else {
bufferMap.put(key, )
bufferMap
}
buffer.update(, ret) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
//合并两个map 相同的key的value累加
val tempMap = (buffer1.getAs[Map[String, Int]]() /: buffer2.getAs[Map[String, Int]]()) {
case (map, (k, v)) => map + (k -> (v + map.getOrElse(k, )))
}
buffer1.update(, tempMap)
} override def evaluate(buffer: Row): Any = {
//返回值最大的key
var max_value =
var max_key = ""
buffer.getAs[Map[String, Int]]().foreach({ x =>
val key = x._1
val value = x._2
if (value > max_value) {
max_value = value
max_key = key
}
})
max_key
}
}

测试类:

object UDAFTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName(this.getClass.getSimpleName).getOrCreate()
spark.udf.register("get_mode", new UDAFGetMode)
import spark.implicits._
val df = Seq(
(, "10.10.1.1", "start"),
(, "10.10.1.1", "search"),
(, "123.123.123.1", "search"),
(, "10.10.1.0", "stop"),
(, "123.123.123.1", "start")
).toDF("id", "ip", "action") df.createTempView("tb")
spark.sql(s"select id,get_mode(ip) as u_ip,count(*) as cnt from tb group by id").show()
}
}

UDAF(用户自定义聚合函数)求众数的更多相关文章

  1. Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)

    在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF( ...

  2. hive学习笔记之十:用户自定义聚合函数(UDAF)

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本文是<hive学习笔记>的第十 ...

  3. 【Spark篇】---SparkSQL中自定义UDF和UDAF,开窗函数的应用

    一.前述 SparkSQL中的UDF相当于是1进1出,UDAF相当于是多进一出,类似于聚合函数. 开窗函数一般分组取topn时常用. 二.UDF和UDAF函数 1.UDF函数 java代码: Spar ...

  4. 【Spark-SQL学习之三】 UDF、UDAF、开窗函数

    环境 虚拟机:VMware 10 Linux版本:CentOS-6.5-x86_64 客户端:Xshell4 FTP:Xftp4 jdk1.8 scala-2.10.4(依赖jdk1.8) spark ...

  5. SQL Server 2008 R2——PIVOT 行转列 以及聚合函数的选择

    ==================================声明================================== 本文原创,转载在正文中显要的注明作者和出处,并保证文章的完 ...

  6. 2、SQL基础整理(聚合函数)

    聚合函数 --求平均 select  AVG(age) as 年龄 from xuesheng select AVG(chinese) as 语文 from xuesheng where class ...

  7. Hive学习之自己定义聚合函数

    Hive支持用户自己定义聚合函数(UDAF),这样的类型的函数提供了更加强大的数据处理功能. Hive支持两种类型的UDAF:简单型和通用型.正如名称所暗示的,简单型UDAF的实现很easy,但因为使 ...

  8. Kafka:ZK&plus;Kafka&plus;Spark Streaming集群环境搭建(十五)Spark编写UDF、UDAF、Agg函数

    Spark Sql提供了丰富的内置函数让开发者来使用,但实际开发业务场景可能很复杂,内置函数不能够满足业务需求,因此spark sql提供了可扩展的内置函数. UDF:是普通函数,输入一个或多个参数, ...

  9. Mongodb学习笔记四&lpar;Mongodb聚合函数&rpar;

    第四章 Mongodb聚合函数 插入 测试数据 ;j<;j++){ for(var i=1;i<3;i++){ var person={ Name:"jack"+i, ...

随机推荐

  1. 【JUC】JDK1&period;8源码分析之SynchronousQueue(九)

    一.前言 本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后 ...

  2. unreal 自定义 Slate Style Sets

    搜集到的最有价值的一篇教学,按照作者的方法尝试中遇到了一些问题.[感谢这位作者!] 网址:https://wiki.unrealengine.com/Slate_Style_Sets_Part_2 在 ...

  3. 未能加载文件或程序集&OpenCurlyDoubleQuote;MySql&period;Web&period;v20&comma; Version&equals;6&period;9&period;4&period;0&comma; Culture&equals;neutral&comma; PublicKeyToken&equals;c5687fc88969c44d”或它的某一个依赖项。系统找不到指定的文件

    未能加载文件或程序集“MySql.Web.v20, Version=6.9.4.0, Culture=neutral, PublicKeyToken=c5687fc88969c44d”或它的某一个依赖 ...

  4. try、catch 和 throw 语句 &lpar;了解&rpar;

    C++ 异常使用 try.catch 和 throw 关键字. 引发表达式指示错误或异常情况. 可以将任何类型的对象用作引发表达式的操作数. 此对象通常用于传达有关错误的信息. 通常,应使用在标准库中 ...

  5. &lbrack;Hive - LanguageManual&rsqb; Describe

    Describe Describe Database Describe Table/View/Column Display Column Statistics Describe Partition D ...

  6. HDU 1160 FatMouse&&num;39&semi;s Speed &lpar;DP&rpar;

    FatMouse's Speed Time Limit:1000MS     Memory Limit:32768KB     64bit IO Format:%I64d & %I64u Su ...

  7. loadrunner使用socket协议来实现客户端对服务器产生压力实例。(通过发送心跳包,达到连接多个客户端的目的)

    #include "lrs.h" vuser_init(){ char *ip; int handler; //编写获取LR分配的Vuser IP函数,将IP保存在ip变量中. i ...

  8. Json常用序列化工具包大比拼

    一.前言 Json已成为计算机编程中最常用的数据传输和存储格式之一,所以对Json的序列化和反序列化工具的选择也是互联网系统中比较重要的环节,尤其在高并发下的执行效率,可能会直接影响系统的吞吐率.本文 ...

  9. python线程的同步事件Event

    Event对象: 用于线程间的通信,某个线程需要根据其他线程的状态来判断自己的下一步操作. Event内部定义了一个全局变量:_flag,默认为False. 当_flag = False时,会阻塞当前 ...

  10. gitlab分支代码本地拉取及jenkins关联gitlab分支

    git本地拉取 git init git remote add origin http://47.*.*.*:8089/back_dev/claimeureka.git git fetch origi ...