如何在Spark SQL中按时间间隔分组

时间:2021-11-20 05:33:02

My dataset looks like this:

我的数据集是这样的:

KEY |Event_Type | metric | Time 
001 |event1     | 10     | 2016-05-01 10:50:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

I want to get all when the keys that verify this:

我想要得到所有的关键验证:

"SUM of metric for a specific event" > threshold during 5 minutes .

“特定事件的度量的总和”>阈值在5分钟内。

This appear to me a perfect candidate for using the Sliding Windows Functions .

在我看来,这是使用滑动窗口函数的最佳人选。

How can I do this with Spark SQL ?

如何使用Spark SQL实现这一点?

Thank you.

谢谢你!

2 个解决方案

#1


20  

Spark >= 2.0

火花> = 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

您可以使用窗口(不要错误地使用窗口函数)。根据不同的变体,它为另一个可能重叠的桶分配时间戳:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window                                       |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |
// +---+---------------------------------------------+-----------+

Spark < 2.0

火花< 2.0

Lets start with example data:

让我们从示例数据开始:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
  ("001", "event1", 10, "2016-05-01 10:50:51"),
  ("002", "event2", 100, "2016-05-01 10:50:53"),
  ("001", "event3", 20, "2016-05-01 10:50:55"),
  ("001", "event1", 15, "2016-05-01 10:51:50"),
  ("003", "event1", 13, "2016-05-01 10:55:30"),
  ("001", "event2", 12, "2016-05-01 10:57:00"),
  ("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

我假设事件是由KEY标识的。如果不是这样,您可以根据您的需求按子句对组/分区进行调整。

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

如果您对具有独立于数据的静态窗口的聚合感兴趣,那么可以将时间戳转换为数字数据类型和整数

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval             |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11         |
// |001|2016-05-01 10:55:00.0|12         |
// |001|2016-05-01 10:50:00.0|45         |
// |003|2016-05-01 10:55:00.0|13         |
// |002|2016-05-01 10:50:00.0|100        |
// +---+---------------------+-----------+

If you're interested in a window relative to the current row use window functions:

如果您对当前行的窗口感兴趣,请使用窗口函数:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time               |ts        |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        |
// |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        |
// |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        |
// |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        |
// |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        |
// |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        |
// |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       |
// +---+----------+------+-------------------+----------+----------+

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

由于性能的原因,这种方法只有在数据可以分割成多个独立的组时才有用。在Spark < 2.0.0中,您还需要HiveContext来使其工作。

#2


0  

For static boundary you can do following:

对于静态边界,您可以做以下工作:

1) Transform (map, mapPartitions etc) Time value to form YYYY-MM-DD-hh-mm where mm is rolled up at 5 minutes level. e.g. 01, 02, 03, 05 becomes 05; 16,17,18,19,20 becomes 20

1)变换(map, mapPartitions等)时间值,形成yyyyy -mm - dd -hh-mm, mm在5分钟的水平上卷起。例如,01 02 03 05变成05;16、17、18、19、20 20

2) Perform groupBy or reduceBy with event_type and time and perform your aggregation(Sum) on metrics

2)使用event_type和time执行groupBy或reduceBy,并对指标执行聚合(Sum)

3) Perform filter transformation to filter metrics > 5

3)对指标>5进行过滤变换

You can write above in spark rdd or dataframe(sql) in almost same way.

您可以使用spark rdd或dataframe(sql)以几乎相同的方式编写上面的代码。

For other type of boundary where 00-05, 01-06, 02-07 you should try looking into concept of sliding window. If your data ingestion use case fits streaming pattern then Spark Streaming API will be perfect otherwise you can find custom solution like this one: Apache Spark - Dealing with Sliding Windows on Temporal RDDs

对于其他类型的边界,比如00-05,01-06,02-07,你应该尝试研究滑动窗口的概念。如果您的数据摄取用例适合流模式,那么Spark流API将是完美的,否则您可以找到这样的自定义解决方案:Apache Spark——处理时态RDDs上的滑动窗口

#1


20  

Spark >= 2.0

火花> = 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

您可以使用窗口(不要错误地使用窗口函数)。根据不同的变体,它为另一个可能重叠的桶分配时间戳:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window                                       |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |
// +---+---------------------------------------------+-----------+

Spark < 2.0

火花< 2.0

Lets start with example data:

让我们从示例数据开始:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
  ("001", "event1", 10, "2016-05-01 10:50:51"),
  ("002", "event2", 100, "2016-05-01 10:50:53"),
  ("001", "event3", 20, "2016-05-01 10:50:55"),
  ("001", "event1", 15, "2016-05-01 10:51:50"),
  ("003", "event1", 13, "2016-05-01 10:55:30"),
  ("001", "event2", 12, "2016-05-01 10:57:00"),
  ("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

我假设事件是由KEY标识的。如果不是这样,您可以根据您的需求按子句对组/分区进行调整。

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

如果您对具有独立于数据的静态窗口的聚合感兴趣,那么可以将时间戳转换为数字数据类型和整数

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval             |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11         |
// |001|2016-05-01 10:55:00.0|12         |
// |001|2016-05-01 10:50:00.0|45         |
// |003|2016-05-01 10:55:00.0|13         |
// |002|2016-05-01 10:50:00.0|100        |
// +---+---------------------+-----------+

If you're interested in a window relative to the current row use window functions:

如果您对当前行的窗口感兴趣,请使用窗口函数:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time               |ts        |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        |
// |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        |
// |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        |
// |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        |
// |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        |
// |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        |
// |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       |
// +---+----------+------+-------------------+----------+----------+

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

由于性能的原因,这种方法只有在数据可以分割成多个独立的组时才有用。在Spark < 2.0.0中,您还需要HiveContext来使其工作。

#2


0  

For static boundary you can do following:

对于静态边界,您可以做以下工作:

1) Transform (map, mapPartitions etc) Time value to form YYYY-MM-DD-hh-mm where mm is rolled up at 5 minutes level. e.g. 01, 02, 03, 05 becomes 05; 16,17,18,19,20 becomes 20

1)变换(map, mapPartitions等)时间值,形成yyyyy -mm - dd -hh-mm, mm在5分钟的水平上卷起。例如,01 02 03 05变成05;16、17、18、19、20 20

2) Perform groupBy or reduceBy with event_type and time and perform your aggregation(Sum) on metrics

2)使用event_type和time执行groupBy或reduceBy,并对指标执行聚合(Sum)

3) Perform filter transformation to filter metrics > 5

3)对指标>5进行过滤变换

You can write above in spark rdd or dataframe(sql) in almost same way.

您可以使用spark rdd或dataframe(sql)以几乎相同的方式编写上面的代码。

For other type of boundary where 00-05, 01-06, 02-07 you should try looking into concept of sliding window. If your data ingestion use case fits streaming pattern then Spark Streaming API will be perfect otherwise you can find custom solution like this one: Apache Spark - Dealing with Sliding Windows on Temporal RDDs

对于其他类型的边界,比如00-05,01-06,02-07,你应该尝试研究滑动窗口的概念。如果您的数据摄取用例适合流模式,那么Spark流API将是完美的,否则您可以找到这样的自定义解决方案:Apache Spark——处理时态RDDs上的滑动窗口