Reputation: 179
I want to count total number of rows in a file. Please explain your code if possible.
String fileAbsolutePath = "gs://sourav_bucket_dataflow/" + fileName;
PCollection<String> data = p.apply("Reading Data From File", TextIO.read().from(fileAbsolutePath));
PCollection<Long> count = data.apply(Count.<String>globally());
Now i want to get the value.
Upvotes: 2
Views: 1991
Reputation: 11
where "input" in line 1 is the input. This will work.
PCollection<Long> number = input.apply(Count.globally());
number.apply(MapElements.via(new SimpleFunction<Long, Long>()
{
public Long apply(Long total)
{
System.out.println("Length is: " + total);
return total;
}
}));
Upvotes: 1
Reputation: 1314
It sort of depends on what you want to do with that number. Assuming you want to use it in your future transformations, you may want to convert it to a PCollectionView object and pass it as a side input to other transformations.
PCollection<String> data = p.apply("Reading Data From File", TextIO.read().from(fileAbsolutePath));
PCollection<Long> count = data.apply(Count.<String>globally());
final PCollectionView<Long> view = count.apply(View.asSingleton());
A quick example to show you how to use the value as a side count:
data.apply(ParDo.of(new FuncFn(view)).withSideInputs(view));
Where:
class FuncFn extends DoFn<String,String>
{
private final PCollectionView<Long> mySideInput;
public FuncFn(PCollectionView<Long> mySideInput) {
this.mySideInput = mySideInput;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
Long count = c.sideInput(mySideInput);
//other stuff you may want to do
}
}
Hope that helps!
Upvotes: 1
Reputation: 3010
There are a variety of sinks that you can use to get data out of your pipeline. https://beam.apache.org/documentation/io/built-in/ has a list of the current built in IO transforms.
Upvotes: 1