Reputation: 193
I have the following input data (in Parquet) for a spark job:
Person (millions of rows)
+---------+----------+---------------+---------------+
| name | location | start | end |
+---------+----------+---------------+---------------+
| Person1 | 1230 | 1478630000001 | 1478630000010 |
| Person2 | 1230 | 1478630000002 | 1478630000012 |
| Person2 | 1230 | 1478630000013 | 1478630000020 |
| Person3 | 3450 | 1478630000001 | 1478630000015 |
+---------+----------+---------------+---------------+
Event (millions of rows)
+----------+----------+---------------+
| event | location | start_time |
+----------+----------+---------------+
| Biking | 1230 | 1478630000005 |
| Skating | 1230 | 1478630000014 |
| Baseball | 3450 | 1478630000015 |
+----------+----------+---------------+
and I need to to transform it into the following expected outcome:
[{
"name" : "Biking",
"persons" : ["Person1", "Person2"]
},
{
"name" : "Skating",
"persons" : ["Person2"]
},
{
"name" : "Baseball",
"persons" : ["Person3"]
}]
In words: the result is a list of each event each with a list of the persons which participated in this event.
A person counts as participant if
Person.start < Event.start_time
&& Person.end > Event.start_time
&& Person.location == Event.location
I have tried different approaches, but the only one which actually seems to work is to join the two dataframes and then group/aggregate them by event. But the join is extremely slow and does not distribute well across multiple CPU cores.
Current code for the Join:
final DataFrame fullFrame = persons.as("persons")
.join(events.as("events"), col("persons.location").equalTo(col("events.location"))
.and(col("events.start_time").geq(col("persons.start")))
.and(col("events.start_time").leq(col("persons.end"))), "inner");
//count to have an action
fullFrame.count();
I am using Spark Standalone and Java, if this makes a difference.
Does anybody have a better idea how to solve this problem with Spark 1.6.2 ?
Upvotes: 5
Views: 2596
Reputation: 380
Range joins are performed as a crossproduct with a subsequent filter step. A potentially better solution could be, to broadcast the potentially smaller events
table and then map the persons
table: inside the map, check for the join condition and produce the respective result.
Upvotes: 1