Reputation: 46
There are two datasets A and B (having single column - ID)
Cat A
1
2
3
4
5
6
7
cat B
4
5
2
8
18
19
2197
Cat A-B
1
3
6
7
This subtraction is done in 2 steps step 1: JOIN A BY ID, LEFT JOIN B BY ID) this will give a dataset that has 2 column where 1st column will have all the entries for dataset A and 2nd column will have only the matching entries from B
1
2 2
3
4 4
5 5
6
7
Step 2: Filter the data set from step1 by records where 2nd field is null Thus we have implemented A-B by using LEFT JOIN.
I am able to execute Step 1 but I am unable to implement step 2. Below is the source code for step 1
public class AMinusB {
public static FlowDef createWorkflowLeftJoin(Tap aTap, Tap bTap,
Tap outputTap) {
Pipe bpipe = new Pipe("b_pipe");
Pipe apipe = new Pipe("a_pipe");
Fields b_user_id = new Fields("B_id");
Fields a_user_id = new Fields("A_id");
Pipe joinPipe = new HashJoin(apipe, a_user_id, bpipe, b_user_id,
new LeftJoin());
Pipe retainPipe = new Pipe("retain", joinPipe);
retainPipe = new Retain(retainPipe, new Fields("A_id", "B_id"));
Pipe cdistPipe = new Pipe("UniquePipe", retainPipe);
Fields selector = new Fields("A_id", "B_id");
cdistPipe = new Unique(cdistPipe, selector);
FlowDef flowDef = FlowDef.flowDef().addSource(apipe, aTap)
.addSource(bpipe, bTap).addTailSink(cdistPipe, outputTap)
.setName("A-B using left outer join");
return flowDef;
}
public static void main(String[] args) {
String Apath = "path to data set A";
String Bpath = "path to data set B";
String outputPath = "path to output";
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties,
LocationsNumForAProduct.class);
FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);
Fields A = new Fields("A_id");
Tap ATap = new Hfs(new TextDelimited(A, false, "\t"), Apath);
Fields B = new Fields("B_id");
Tap BTap = new Hfs(new TextDelimited(B, false, "\t"), Bpath);
Tap outputTap = new Hfs(new TextDelimited(false, "\t"), outputPath);
FlowDef flowDefLeftJoin = createWorkflowLeftJoin(ATap, BTap, outputTap);
flowConnector.connect(flowDefLeftJoin).complete();
}
}
Upvotes: 0
Views: 37
Reputation: 1121
Check Operation FilterNull .
cdistPipe = new Each(cdistPipe, selector,new FilterNull());
Upvotes: 0