Reputation: 23
I'm having trouble understanding if the joins in Apache Beam (e.g. http://www.waitingforcode.com/apache-beam/joins-apache-beam/read) can join entire rows.
For example:
I have 2 datasets, in CSV format, where the first rows are column headers.
The first:
a,b,c,d
1,2,3,4
5,6,7,8
1,2,5,4
The second:
c,d,e,f
3,4,9,10
I want to left join on columns c and d so that I end up with:
a,b,c,d,e,f
1,2,3,4,9,10
5,6,7,8,,
1,2,5,4,,
However all the documentation on Apache Beam seems to say the PCollection objects need to be of type KV<K, V>
when joining, so I have broken down my PCollection objects to a collection of KV<String, String>
objects (where the key is the column header, and the value is row value). But in that case (where you just have a key with a value) I don't see how the row format can be maintained. How would KV(c,7) "know" that KV(a,5) is from the same row? Is Join meant for this sort of thing at all?
My code so far:
PCollection<KV<String, String>> flightOutput = ...;
PCollection<KV<String, String>> arrivalWeatherDataForJoin = ...;
PCollection<KV<String, KV<String, String>>> output = Join.leftOuterJoin(flightOutput, arrivalWeatherDataForJoin, "");
Upvotes: 2
Views: 2969
Reputation: 2539
Yes, Join
is the utility class to help with joins like yours. It is a wrapper around CoGropByKey
, see the corresponding section in the docs. The implementation of it is pretty short. Its tests might also have helpful examples.
Problem in your case is likely caused by how you're choosing the keys.
The KeyT
int KV<KeyT,V1>
in the Join
library represents the key which you are using to match the records, it contains all the join fields. So in your case you will probably need to assign keys something like this (pseudocode):
pCollection1:
Key Value
(3,4) (1,2,3,4)
(7,8) (5,6,7,8)
(5,4) (1,2,5,4)
pCollection2:
Key Value
(3,4) (3,4,9,10)
And what will come of the join will look something like this (pseudocode):
joinResultPCollection:
Key Value
(3,4) (1,2,3,4),(3,4,9,10)
(7,8) (5,6,7,8),nullValue
(5,4) (1,2,5,4),nullValue
So you will probably need to add another transform after join to actually merge the left and right side into a combined row.
Because you have a CSV, you probably could use actual strings like "3,4"
as keys (and values). Or you could use Lists<>
or your custom row types.
For example, this is exactly what Beam SQL Join implementation does.
Upvotes: 3