Aniruddha Sinha
Aniruddha Sinha

Reputation: 799

Generate multiple output files using MultiSinkTap

I had the following dataset as input

id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
day1,Daisy,Female
jazz030,Jasmine,Female
Mic002,Michael,Male

I aimed at segregating the males and females into two separate output files as follows
Dataset for males

id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
Mic002,Michael,Male

Dataset for females

id,name,gender
day1,Daisy,Female
jazz030,Jasmine,Female

Now, I attempted to write a Cascading Framework code which is supposed to do the above task, the code is as follows

public class Main {

      public static void main(String[] args) {
          Tap sourceTap = new FileTap(new TextDelimited(true, ","),       "inputFile.txt");
          Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
          Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");

          Pipe assembly = new Pipe("inputPipe");


          // ...split into two pipes
          Pipe malePipe = new Pipe("for_male", assembly);
          malePipe=new Each(malePipe,new CustomFilterByGender("male"));
          Pipe femalePipe = new Pipe("for_female", assembly);
          femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
          // create the flow
           List<Pipe> pipes = new ArrayList<Pipe>(2)
        {{pipes.add(countOne);
          pipes.add(countTwo);}};

          Tap outputTap=new MultiSinkTap<>(sink_one,sink_two);

          FlowConnector flowConnector = new LocalFlowConnector();
          Flow flow = flowConnector.connect(sourceTap, outputTap, pipes);
          flow.complete();
      }

where CustomFilterByGender(String gender); is a custom function that returns tuples according to the gender value passed as argument.

Please note that I have not used Custom Buffer for the sake of efficiency.
Using MultiSinkTap, I am not able to get the desired output since the connect() method of the LocalFlowConnector object is not accepting the MultiSinkTap Object which results to a compilation time error.
It will be imperative if you suggest possible changes in the above code to make it work or the way to use MultiSinkTap.
Thankyou for patiently going through the question :)

Upvotes: 1

Views: 120

Answers (1)

Vaijnath Polsane
Vaijnath Polsane

Reputation: 657

I think you want to write output of different pipes into different output files, I made some changes in your code that should solve your purpose definitely.

public class Main {
  public static void main(String[] args) {
      Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
      Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
      Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");

      Pipe assembly = new Pipe("inputPipe");

      Pipe malePipe = new Pipe("for_male", assembly);
      malePipe=new Each(malePipe,new CustomFilterByGender("male"));
      Pipe femalePipe = new Pipe("for_female", assembly);
      femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));

      List<Pipe> pipes = new ArrayList<Pipe>(2);
      pipes.add(malePipe);
      pipes.add(femalePipe);

      Map<String, Tap> sinks = new HashMap<String, Tap>();
      sinks.put("for_male", sink_one);
      sinks.put("for_female", sink_two);

      FlowConnector flowConnector = new LocalFlowConnector();
      Flow flow = flowConnector.connect(sourceTap, sinks, pipes);
      flow.complete();
  }

Instead of using MultiSinkTap you can directly give the Map<> of Sinks those you want to connect to the output pipes in this case malePipe and femalePipe.

Upvotes: 4

Related Questions