具有嵌套列的Apache Spark窗口函数

时间:2022-01-10 01:44:49

I'm not sure this is a bug (or just incorrect syntax). I searched around and didn't see this mentioned elsewhere so I'm asking here before filing a bug report.

我不确定这是一个错误(或者只是语法不正确)。我搜索了一下,没有看到其他地方提到过这个,所以在提交bug报告之前,我在这里提出了这个问题。

I'm trying to use a Window function partitioned on a nested column. I've created a small example below demonstrating the problem.

我正在尝试使用一个窗口函数,在嵌套的列上进行分区。我在下面创建了一个小示例来演示这个问题。

import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
  .withColumn("Data", struct("A", "B", "C")).drop("A").drop("B").drop("C")
val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").where("num = max").drop("max").show

The above results in an error org.apache.spark.sql.AnalysisException: resolved attribute(s) A#39,B#40 missing from num#33,Data#37 in operator !Project [num#33,Data#37,A#39,B#40]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44) ...

上面的结果是一个error org.apache.spark.sql。AnalysisException:解析属性A#39, b# 40缺失num#33,操作员数据#37,Project [num#33,Data#37,A#39,B#40];在org.apache.spark.sql.催化剂上进行分析。checkanalysis $class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.催化剂。分析。分析。失败分析(Analyzer.scala:44)…

If instead those columns aren't nested, it works fine. Am I missing something with the syntax, or is this a bug?

如果这些列不是嵌套的,那么它就可以正常工作。我在语法上漏掉了什么,还是这是个bug?

1 个解决方案

#1


2  

It looks to me like you are hitting a bug when the analyzer is trying to expand the *

在我看来,当分析器试图扩展*时,您就像是在攻击一个bug

import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

sql("SET spark.sql.eagerAnalysis=false") // Let us see the error even though we are constructing an invalid tree

val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
  .withColumn("Data", struct("A", "B", "C"))
  .drop("A")
  .drop("B")
  .drop("C")

val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").explain(true)

By turning off eager analysis (so that we can call explain without it throwing an error) you can see that the "*" is getting expanded to include columns that aren't actually available:

通过关闭热切分析(这样我们就可以调用explain而不会产生错误),您可以看到“*”被扩展为包含实际上不可用的列:

== Parsed Logical Plan ==
'Project [*,'max('num) windowspecdefinition('Data.A,'Data.B,'num DESC,UnspecifiedFrame) AS max#64928]
+- Project [num#64926,Data#64927]
   +- Project [C#64925,num#64926,Data#64927]
      +- Project [B#64924,C#64925,num#64926,Data#64927]
         +- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
            +- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
               +- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]

== Analyzed Logical Plan ==
num: int, Data: struct<A:string,B:string,C:string>, max: int
Project [num#64926,Data#64927,max#64928]
+- Project [num#64926,Data#64927,A#64932,B#64933,max#64928,max#64928]
   +- Window [num#64926,Data#64927,A#64932,B#64933], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax(num#64926) windowspecdefinition(A#64932,B#64933,num#64926 DESC,RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max#64928], [A#64932,B#64933], [num#64926 DESC]
      +- !Project [num#64926,Data#64927,A#64932,B#64933]
         +- Project [num#64926,Data#64927]
            +- Project [C#64925,num#64926,Data#64927]
               +- Project [B#64924,C#64925,num#64926,Data#64927]
                  +- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
                     +- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
                        +- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]

I've filed this here: https://issues.apache.org/jira/browse/SPARK-12989. If you manually list out the columns instead of using a * that should act as a workaround.

我在这里提交了这个:https://issues es.apache.org/jira/browse/spark -12989。如果您手动列出列,而不是使用应该作为解决方案的*。

#1


2  

It looks to me like you are hitting a bug when the analyzer is trying to expand the *

在我看来,当分析器试图扩展*时,您就像是在攻击一个bug

import sqlContext.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

sql("SET spark.sql.eagerAnalysis=false") // Let us see the error even though we are constructing an invalid tree

val data = Seq(("a", "b", "c", 3), ("c", "b", "a", 3)).toDF("A", "B", "C", "num")
  .withColumn("Data", struct("A", "B", "C"))
  .drop("A")
  .drop("B")
  .drop("C")

val winSpec = Window.partitionBy("Data.A", "Data.B").orderBy($"num".desc)
data.select($"*", max("num").over(winSpec) as "max").explain(true)

By turning off eager analysis (so that we can call explain without it throwing an error) you can see that the "*" is getting expanded to include columns that aren't actually available:

通过关闭热切分析(这样我们就可以调用explain而不会产生错误),您可以看到“*”被扩展为包含实际上不可用的列:

== Parsed Logical Plan ==
'Project [*,'max('num) windowspecdefinition('Data.A,'Data.B,'num DESC,UnspecifiedFrame) AS max#64928]
+- Project [num#64926,Data#64927]
   +- Project [C#64925,num#64926,Data#64927]
      +- Project [B#64924,C#64925,num#64926,Data#64927]
         +- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
            +- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
               +- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]

== Analyzed Logical Plan ==
num: int, Data: struct<A:string,B:string,C:string>, max: int
Project [num#64926,Data#64927,max#64928]
+- Project [num#64926,Data#64927,A#64932,B#64933,max#64928,max#64928]
   +- Window [num#64926,Data#64927,A#64932,B#64933], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax(num#64926) windowspecdefinition(A#64932,B#64933,num#64926 DESC,RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS max#64928], [A#64932,B#64933], [num#64926 DESC]
      +- !Project [num#64926,Data#64927,A#64932,B#64933]
         +- Project [num#64926,Data#64927]
            +- Project [C#64925,num#64926,Data#64927]
               +- Project [B#64924,C#64925,num#64926,Data#64927]
                  +- Project [A#64923,B#64924,C#64925,num#64926,struct(A#64923,B#64924,C#64925) AS Data#64927]
                     +- Project [_1#64919 AS A#64923,_2#64920 AS B#64924,_3#64921 AS C#64925,_4#64922 AS num#64926]
                        +- LocalRelation [_1#64919,_2#64920,_3#64921,_4#64922], [[a,b,c,3],[c,b,a,3]]

I've filed this here: https://issues.apache.org/jira/browse/SPARK-12989. If you manually list out the columns instead of using a * that should act as a workaround.

我在这里提交了这个:https://issues es.apache.org/jira/browse/spark -12989。如果您手动列出列,而不是使用应该作为解决方案的*。