Reputation: 794
I have two datasets with a common key column and I want to perform left join operation. Is there any corresponding function in apache beam that performs the left join operation in apache beam ?
Upvotes: 4
Views: 6742
Reputation: 32
Example of left outer join:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.joinlibrary.Join;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Test;
public class TestJoin {
@Test
public void left_join_example() {
Pipeline pipeline = Pipeline.create();
PCollection<KV<Integer, String>> leftCollection = pipeline.apply(Create.of(KV.of(1, "a"), KV.of(2, "b"), KV.of(3, "c")));
PCollection<KV<Integer, Integer>> rightCollection = pipeline.apply(Create.of(KV.of(1, 1), KV.of(1, 10), KV.of(2, 2), KV.of(4, 4)));
PCollection<KV<Integer, KV<String, Integer>>> leftJoinCollection = Join.leftOuterJoin(leftCollection, rightCollection, 0);
PAssert.that(leftJoinCollection).containsInAnyOrder(KV.of(1, KV.of("a" , 1)), KV.of(1, KV.of("a" , 10)),
KV.of(2, KV.of("b", 2)), KV.of(3, KV.of("c", 0)));
pipeline.run().waitUntilFinish();
}
}
Upvotes: 1
Reputation: 2539
There is a small library of joins available in Beam Java SDK, see if the implementation works for you: org.apache.beam.sdk.extensions.joinlibrary.Join
, source
Update
You can implement it yourself with similar approach, utilizing CoGroupByKey
:
- put both PCollections
into a KeyedPCollectionTuple
;
- apply a CoGroupByKey
which will group elements from both PCollections
per key per window;
- apply a ParDo
which loops over the results of a CoGroupByKey
, joins left and right record one at a time, and emits the results (see the CoGroupByKey
example in the Beam Programming Guide);
Upvotes: 4