User27854
User27854

Reputation: 884

Implementing Inner Join using Apache Beam

I am on the process of learning apache beam framework. I have implemented an Inner join, by looking at the example described in this link. But I am unable to understand it's working completely. K

    // Read Cutomer Order File
    PCollection<String> pCollection1 = pipeline.apply(TextIO.read().from(
            "src/main/resources/Join/InnerJoin/CustomerOrder.csv"));

    //Read Customer File
    PCollection<String> pCollection2 = pipeline.apply(TextIO.read()
            .from("src/main/resources/Join/InnerJoin/Customer.csv"));

    //Parse customerOrder data in form of PCollection KeyValue Pair 
    PCollection<KV<String, String>> customerOrderCollection = pCollection1
            .apply(ParDo.of(new CustomerOrderParsing()));

    //Parse Customer data in form of PCollection KeyValue Pair
    PCollection<KV<String, String>> customerCollection = pCollection2.apply(ParDo.of(new CustomerParsing()));

    // Create TupleTag Object
    TupleTag<String> customerOrderrTuple = new TupleTag<String>();
    TupleTag<String> customerTuple = new TupleTag<String>();

    

unable to understand why we are doing this. On referring to the documentation, I could get that we are aggregating all the elements based on the key value. But why are we doing this?

    PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
            .of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
            .apply(CoGroupByKey.<String>create());

I don't know what's happening here

    PCollection<String> output = result.apply(ParDo.of(
            new DoFn<KV<String, CoGbkResult>, String>() {
                @ProcessElement
                public void processElement(ProcessContext context) {
                    String strKey = context.element().getKey();
                    CoGbkResult valueObject = context.element().getValue();
                    Iterable<String> customerOrderTable = valueObject.getAll(customerOrderrTuple);
                    Iterable<String> customerTable = valueObject.getAll(customerTuple);
                    
                    for (String order:customerOrderTable) {
                        for (String user:customerTable) {
                            context.output(strKey+","+order+","+user);
                            
                        }
                        
                    }
                }
            }

    ));

Upvotes: 1

Views: 1302

Answers (1)

Alexey Romanenko
Alexey Romanenko

Reputation: 1443

 PCollection<KV<String, CoGbkResult>> result = KeyedPCollectionTuple
            .of(customerOrderrTuple, customerOrderCollection).and(customerTuple, customerCollection)
            .apply(CoGroupByKey.<String>create());

This is actually where your join is happening and it returns a PCollection of key-values, where a key is a joined key and a value contains the all values for this key. So, then you need to iterate over the values of all joined collections for the same key to join them in a way that you need - that is why there is another DoFn downstream that performs this in a user specific way.

I agree that this quite verbose and potential error-prone API (since it's quite low-level), so did you consider do you use Schema-aware PCollections? With this API joins and all other relational operations are much easier to do.

For example, Inner join may look like this (much simpler) and it already supports different types of joins:

PCollection<Transaction> transactions = readTransactions();
PCollection<Review> reviews = readReviews();
PCollection<Row> joined = transactions.apply(
    Join.innerJoin(reviews).using("userId", "productId"));

Upvotes: 3

Related Questions