mastro
mastro

Reputation: 619

How to use collect_list?

I have two DataFrame and would like to join them according to date,time,mid,binImbalance fields and collect the corresponding values in timeB and midB in lists.

I have tried with the following code:

val d1: DataFrame
val d3: DataFrame
val d2 = d3
     .withColumnRenamed("date", "dateC")
     .withColumnRenamed("milliSec", "milliSecC")
     .withColumnRenamed("mid", "midC")
     .withColumnRenamed("time", "timeC")
     .withColumnRenamed("binImbalance", "binImbalanceC")

  d1.join(d2, d1("date") === d2("dateC") and 
              d1("time") === d2("timeC") and 
              d1("mid") === d2("midC")
         )
    .groupBy("date", "time", "mid", "binImbalance")
    .agg(collect_list("timeB"),collect_list("midB"))

But this does not work as I get the error: : Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35. At the same time, if I renamed one of the timeB column, I would not be able to collect the values in a list.

An example result should be:

+-----+---------+------+------------+---------+------+
| date|     time|   mid|binImbalance|    timeB|  midB|
+-----+---------+------+------------+---------+------+
|  1  |     1   |  10  |           1|    4    |  10  |          
|  2  |     2   |  20  |           2|    5    |  11  |            
|  3  |     3   |  30  |           3|    6    |  12  |             


+-----+---------+------+------------+---------+------+
| date|     time|   mid|binImbalance|    timeB|  midB|
+-----+---------+------+------------+---------+------+
|  1  |     1   |  10  |           1|    7    |  13  |          
|  2  |     2   |  20  |           2|    8    |  14  |            
|  3  |     3   |  30  |           3|    9    |  15  |

RESULT:

+-----+---------+------+------------+---------+-----------+
| date|     time|   mid|binImbalance| ListTime|  ListMid  |
+-----+---------+------+------------+---------+-----------+
|  1  |     1   |  10  |           1|   [4,7] |  [10,13]  |          
|  2  |     2   |  20  |           2|   [5,8] |  [11,14]  |            
|  3  |     3   |  30  |           3|   [6,9] |  [12,15]  |

Minimal, Complete, and Verifiable example

d1            d2
id   data     id   data    
--   ----     --   ----
1    1        1    2
2    4        2    5
3    6        3    3

Result
id   list
--   ----
1    [1,2]
2    [4,5]
3    [6,3]

Upvotes: 1

Views: 4863

Answers (1)

mastro
mastro

Reputation: 619

Solution on the minimal example:

import org.apache.spark.sql.functions.udf

val aggregateDataFrames = udf( (x: Double, y: Double) => Seq(x,y))

val d3 = d2.withColumnRenamed("id","id3")
           .withColumnRenamed("data","data3")

val joined = d1.join(d3, d1("id") === d3("id3"))


val result = joined
              .withColumn("list", aggregateDataFrames(joined("data"),joined("data3")))
              .select("id","list")

Upvotes: 1

Related Questions