Reputation: 619
I have two DataFrame
and would like to join them according to date
,time
,mid
,binImbalance
fields and collect the corresponding values in timeB
and midB
in lists.
I have tried with the following code:
val d1: DataFrame
val d3: DataFrame
val d2 = d3
.withColumnRenamed("date", "dateC")
.withColumnRenamed("milliSec", "milliSecC")
.withColumnRenamed("mid", "midC")
.withColumnRenamed("time", "timeC")
.withColumnRenamed("binImbalance", "binImbalanceC")
d1.join(d2, d1("date") === d2("dateC") and
d1("time") === d2("timeC") and
d1("mid") === d2("midC")
)
.groupBy("date", "time", "mid", "binImbalance")
.agg(collect_list("timeB"),collect_list("midB"))
But this does not work as I get the error: : Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35
.
At the same time, if I renamed one of the timeB
column, I would not be able to collect the values in a list.
An example result should be:
+-----+---------+------+------------+---------+------+
| date| time| mid|binImbalance| timeB| midB|
+-----+---------+------+------------+---------+------+
| 1 | 1 | 10 | 1| 4 | 10 |
| 2 | 2 | 20 | 2| 5 | 11 |
| 3 | 3 | 30 | 3| 6 | 12 |
+-----+---------+------+------------+---------+------+
| date| time| mid|binImbalance| timeB| midB|
+-----+---------+------+------------+---------+------+
| 1 | 1 | 10 | 1| 7 | 13 |
| 2 | 2 | 20 | 2| 8 | 14 |
| 3 | 3 | 30 | 3| 9 | 15 |
RESULT:
+-----+---------+------+------------+---------+-----------+
| date| time| mid|binImbalance| ListTime| ListMid |
+-----+---------+------+------------+---------+-----------+
| 1 | 1 | 10 | 1| [4,7] | [10,13] |
| 2 | 2 | 20 | 2| [5,8] | [11,14] |
| 3 | 3 | 30 | 3| [6,9] | [12,15] |
Minimal, Complete, and Verifiable example
d1 d2
id data id data
-- ---- -- ----
1 1 1 2
2 4 2 5
3 6 3 3
Result
id list
-- ----
1 [1,2]
2 [4,5]
3 [6,3]
Upvotes: 1
Views: 4863
Reputation: 619
Solution on the minimal example:
import org.apache.spark.sql.functions.udf
val aggregateDataFrames = udf( (x: Double, y: Double) => Seq(x,y))
val d3 = d2.withColumnRenamed("id","id3")
.withColumnRenamed("data","data3")
val joined = d1.join(d3, d1("id") === d3("id3"))
val result = joined
.withColumn("list", aggregateDataFrames(joined("data"),joined("data3")))
.select("id","list")
Upvotes: 1