Rakesh Shah
Rakesh Shah

Reputation: 46

Implement A-B functionality using Cascading with LEFT JOIN

There are two datasets A and B (having single column - ID)

Cat A

    1
    2
    3
    4
    5
    6
    7

cat B
4
5
2
8
18
19
2197

Cat A-B
1
3
6
7

This subtraction is done in 2 steps step 1: JOIN A BY ID, LEFT JOIN B BY ID) this will give a dataset that has 2 column where 1st column will have all the entries for dataset A and 2nd column will have only the matching entries from B

1   
2   2
3   
4   4
5   5
6   
7   

Step 2: Filter the data set from step1 by records where 2nd field is null Thus we have implemented A-B by using LEFT JOIN.

I am able to execute Step 1 but I am unable to implement step 2. Below is the source code for step 1

public class AMinusB {

public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap,
        Tap outputTap) {
    Pipe bpipe = new Pipe("b_pipe");
    Pipe apipe = new Pipe("a_pipe");
    Fields b_user_id = new Fields("B_id");
    Fields a_user_id = new Fields("A_id");

    Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id,
            new LeftJoin());
    Pipe retainPipe = new Pipe("retain", joinPipe);
    retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id"));

    Pipe cdistPipe = new Pipe("UniquePipe", retainPipe);

    Fields selector = new Fields("A_id", "B_id");

    cdistPipe = new Unique(cdistPipe, selector);

    FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap)
            .addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap)
            .setName("A-B using left outer join");
    return flowDef;
}

public static void main(String[] args) {
    String Apath = "path to data set A";
    String Bpath = "path to data set B";
    String outputPath = "path to output";
    Properties properties = new Properties();
    AppProps.setApplicationJarClass(properties,
            LocationsNumForAProduct.class);
    FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);

    Fields A = new Fields("A_id");
    Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath);

    Fields B = new Fields("B_id");
    Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath);

    Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath);

    FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap);
    flowConnector.connect(flowDefLeftJoin).complete();

}

}

Upvotes: 0

Views: 37

Answers (1)

Amit
Amit

Reputation: 1121

Check Operation FilterNull .

cdistPipe = new Each(cdistPipe, selector,new FilterNull());

Upvotes: 0

Related Questions