Baskar
Baskar

Reputation: 41

Implementing Cartesian join in Cascading

I would like to know is it possible that we can do Cartesian join in Cascading. If anyone can give a simple clear example to understand Cartesian join in cascading?

Upvotes: 1

Views: 71

Answers (1)

Dhruv Pancholi
Dhruv Pancholi

Reputation: 144

Use the following SubAssembly for doing Cartesian Join:

/**
 * Created by dhruv.pancholi on 16/01/17.
 */
public class CartesianJoin extends SubAssembly {

    public static class CommonFieldAddOperation extends BaseOperation implements Function, Serializable {

        public CommonFieldAddOperation(Fields outputFields) {
            super(outputFields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();

            // Copying the same tuple from input
            Tuple tuple = new Tuple(arguments.getTuple());

            // Adding 1 for joining on this field
            tuple.add(1);

            functionCall.getOutputCollector().add(tuple);
        }
    }

    public CartesianJoin(Pipe leftPipe, Fields leftFields, Pipe rightPipe, Fields rightFields) {

        // Adding 1 at the end of each tuple for joining
        leftPipe = new Each(leftPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(leftFields, new Fields("cartesian_common"))), Fields.RESULTS);

        // Adding 1 at the end of each tuple for joining
        rightPipe = new Each(rightPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(rightFields, new Fields("cartesian_common_"))), Fields.RESULTS);

        // Joining on the 1 which was added in both the pipes
        Pipe joinPipe = new CoGroup(leftPipe, new Fields("cartesian_common"), rightPipe, new Fields("cartesian_common_"), new InnerJoin());

        // Keeping only the original fields
        joinPipe = new Retain(joinPipe, Fields.merge(leftFields, rightFields));

        // Adding output pipe of the sub-assembly
        setTails(joinPipe);
    }

}

Use the following code snippet in the main function or wherever flow is defined:

Pipe joinPipe = new CartesianJoin(leftPipe, new Fields("id", "name"), rightPipe, new Fields("id_", "name_"));

leftPipe

id  name
1   dhruv
3   arun

rightPipe

id_ name_
1   dhruv
2   gaj

joinPipe

id  name    id_ name_
3   arun    2   gaj
3   arun    1   dhruv
1   dhruv   2   gaj
1   dhruv   1   dhruv

Upvotes: 0

Related Questions