Sourav Chatterjee
Sourav Chatterjee

Reputation: 179

how to count number of rows in a file in dataflow in a efficient way?

I want to count total number of rows in a file. Please explain your code if possible.

String fileAbsolutePath = "gs://sourav_bucket_dataflow/" + fileName;

    PCollection<String> data = p.apply("Reading Data From File", TextIO.read().from(fileAbsolutePath));

    PCollection<Long> count = data.apply(Count.<String>globally());

Now i want to get the value.

Upvotes: 2

Views: 1991

Answers (3)

Arpan Solanki
Arpan Solanki

Reputation: 11

where "input" in line 1 is the input. This will work.

PCollection<Long> number = input.apply(Count.globally());
    number.apply(MapElements.via(new SimpleFunction<Long, Long>() 
    {
        public Long apply(Long total) 
        {
            System.out.println("Length is: " + total);
            return total;
        }
    }));

Upvotes: 1

Haris Nadeem
Haris Nadeem

Reputation: 1314

It sort of depends on what you want to do with that number. Assuming you want to use it in your future transformations, you may want to convert it to a PCollectionView object and pass it as a side input to other transformations.

    PCollection<String> data = p.apply("Reading Data From File", TextIO.read().from(fileAbsolutePath));
    PCollection<Long> count = data.apply(Count.<String>globally());    

    final PCollectionView<Long> view = count.apply(View.asSingleton());

A quick example to show you how to use the value as a side count:

    data.apply(ParDo.of(new FuncFn(view)).withSideInputs(view));

Where:

    class FuncFn extends DoFn<String,String>
    {
      private final PCollectionView<Long> mySideInput;

      public FuncFn(PCollectionView<Long> mySideInput) {
          this.mySideInput = mySideInput;
      }

      @ProcessElement
      public void processElement(ProcessContext c) throws IOException
      {        
        Long count = c.sideInput(mySideInput);
        //other stuff you may want to do
      }
    }

Hope that helps!

Upvotes: 1

danielm
danielm

Reputation: 3010

There are a variety of sinks that you can use to get data out of your pipeline. https://beam.apache.org/documentation/io/built-in/ has a list of the current built in IO transforms.

Upvotes: 1

Related Questions