Reputation: 799
I had the following dataset as input
id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
day1,Daisy,Female
jazz030,Jasmine,Female
Mic002,Michael,Male
I aimed at segregating the males and females into two separate output files as follows
Dataset for males
id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
Mic002,Michael,Male
Dataset for females
id,name,gender
day1,Daisy,Female
jazz030,Jasmine,Female
Now, I attempted to write a Cascading Framework code which is supposed to do the above task, the code is as follows
public class Main {
public static void main(String[] args) {
Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");
Pipe assembly = new Pipe("inputPipe");
// ...split into two pipes
Pipe malePipe = new Pipe("for_male", assembly);
malePipe=new Each(malePipe,new CustomFilterByGender("male"));
Pipe femalePipe = new Pipe("for_female", assembly);
femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
// create the flow
List<Pipe> pipes = new ArrayList<Pipe>(2)
{{pipes.add(countOne);
pipes.add(countTwo);}};
Tap outputTap=new MultiSinkTap<>(sink_one,sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(sourceTap, outputTap, pipes);
flow.complete();
}
where CustomFilterByGender(String gender); is a custom function that returns tuples according to the gender value passed as argument.
Please note that I have not used Custom Buffer for the sake of efficiency.
Using MultiSinkTap, I am not able to get the desired output since the connect()
method of the LocalFlowConnector
object is not accepting the MultiSinkTap Object which results to a compilation time error.
It will be imperative if you suggest possible changes in the above code to make it work or the way to use MultiSinkTap
.
Thankyou for patiently going through the question :)
Upvotes: 1
Views: 120
Reputation: 657
I think you want to write output of different pipes into different output files, I made some changes in your code that should solve your purpose definitely.
public class Main {
public static void main(String[] args) {
Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");
Pipe assembly = new Pipe("inputPipe");
Pipe malePipe = new Pipe("for_male", assembly);
malePipe=new Each(malePipe,new CustomFilterByGender("male"));
Pipe femalePipe = new Pipe("for_female", assembly);
femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
List<Pipe> pipes = new ArrayList<Pipe>(2);
pipes.add(malePipe);
pipes.add(femalePipe);
Map<String, Tap> sinks = new HashMap<String, Tap>();
sinks.put("for_male", sink_one);
sinks.put("for_female", sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(sourceTap, sinks, pipes);
flow.complete();
}
Instead of using MultiSinkTap you can directly give the Map<> of Sinks those you want to connect to the output pipes in this case malePipe and femalePipe.
Upvotes: 4