如何从数据框中为一对键选择最新数据?

时间:2023-01-17 15:42:05

I am new in Spark/Scala world, and I have a question regarding data selection from dataframes. I have a table with the following data, and I need to choose for each cust and user_id pair, all the last modified records, with max modify_time:

我是Spark / Scala世界的新手,我对数据框中的数据选择有疑问。我有一个包含以下数据的表,我需要为每个cust和user_id对选择所有最后修改的记录,并使用max modify_time:

Original data frame:

原始数据框:

+--------+----------+------------+--------------------+
|  cust  | user_id  | another_id |     modify_time    | 
+--------+----------+------------+--------------------+
|   cust1|   1      |         222|2017-03-22 07:29    |
|   cust1|   1      |         111|2017-03-22 07:29    |
|   cust2|   2      |         111|2017-03-21 07:29    |
|   cust1|   1      |         333|2017-03-21 07:29    |
|   cust2|   2      |         444|2017-03-22 07:29    |
|   cust2|   2      |         333|2017-03-22 07:29    |
+--------+----------+------------+--------------------+

The required result:

所需结果:

+--------+----------+------------+--------------------+
|  cust  | user_id  | another_id |     modify_time    | 
+--------+----------+------------+--------------------+
|   cust1|   1      |         222|2017-03-22 07:29    |
|   cust1|   1      |         111|2017-03-22 07:29    |
|   cust2|   2      |         444|2017-03-22 07:29    |
|   cust2|   2      |         333|2017-03-22 07:29    |
+--------+----------+------------+--------------------+

What is the most efficient way to do so?

最有效的方法是什么?

1 个解决方案

#1


0  

I did the following and it gave me the expected result:

我做了以下,它给了我预期的结果:

val custUserModifyTime = df
      .groupBy($"cust", $"user_id").agg(max($"modify_time")).collect()

 val mostRecent: Seq[DataFrame] = custUserModifyTime.map(x => df.select("*")
      .where("cust = '" + x.getAs[String]("cust") + "'" +
        " AND user_id = '" + x.getAs[String]("user_id") + "'" +
        " AND modify_time = '" + x.getAs[Timestamp]("max(modify_time)") + "'"))

val unifiedMostUpdatedData = mostRecent.reduce((a, b) => a.union(b))

#1


0  

I did the following and it gave me the expected result:

我做了以下,它给了我预期的结果:

val custUserModifyTime = df
      .groupBy($"cust", $"user_id").agg(max($"modify_time")).collect()

 val mostRecent: Seq[DataFrame] = custUserModifyTime.map(x => df.select("*")
      .where("cust = '" + x.getAs[String]("cust") + "'" +
        " AND user_id = '" + x.getAs[String]("user_id") + "'" +
        " AND modify_time = '" + x.getAs[Timestamp]("max(modify_time)") + "'"))

val unifiedMostUpdatedData = mostRecent.reduce((a, b) => a.union(b))