Reputation: 884
I am on the process of learning apache beam framework. I have implemented an Inner join, by looking at the example described in this link. But I am unable to understand it's working completely. K
// Read Cutomer Order File
PCollection<String> pCollection1 = pipeline.apply(TextIO.read().from(
"src/main/resources/Join/InnerJoin/CustomerOrder.csv"));
//Read Customer File
PCollection<String> pCollection2 = pipeline.apply(TextIO.read()
.from("src/main/resources/Join/InnerJoin/Customer.csv"));
//Parse customerOrder data in form of PCollection KeyValue Pair
PCollection<KV<String, String>> customerOrderCollection = pCollection1
.apply(ParDo.of(new CustomerOrderParsing()));
//Parse Customer data in form of PCollection KeyValue Pair
PCollection<KV<String, String>> customerCollection = pCollection2.apply(ParDo.of(new CustomerParsing()));
// Create TupleTag Object
TupleTag<String> customerOrderrTuple = new TupleTag<String>();
TupleTag<String> customerTuple = new TupleTag<String>();
unable to understand why we are doing this. On referring to the documentation, I could get that we are aggregating all the elements based on the key value. But why are we doing this?
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
.of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
.apply(CoGroupByKey.<String>create());
I don't know what's happening here
PCollection<String> output = result.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
CoGbkResult valueObject = context.element().getValue();
Iterable<String> customerOrderTable = valueObject.getAll(customerOrderrTuple);
Iterable<String> customerTable = valueObject.getAll(customerTuple);
for (String order:customerOrderTable) {
for (String user:customerTable) {
context.output(strKey+","+order+","+user);
}
}
}
}
));
Upvotes: 1
Views: 1302
Reputation: 1443
PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
.of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
.apply(CoGroupByKey.<String>create());
This is actually where your join is happening and it returns a PCollection
of key-values, where a key
is a joined key and a value
contains the all values for this key. So, then you need to iterate over the values of all joined collections for the same key to join them in a way that you need - that is why there is another DoFn
downstream that performs this in a user specific way.
I agree that this quite verbose and potential error-prone API (since it's quite low-level), so did you consider do you use Schema-aware PCollection
s? With this API joins and all other relational operations are much easier to do.
For example, Inner
join may look like this (much simpler) and it already supports different types of joins:
PCollection<Transaction> transactions = readTransactions();
PCollection<Review> reviews = readReviews();
PCollection<Row> joined = transactions.apply(
Join.innerJoin(reviews).using("userId", "productId"));
Upvotes: 3