Reputation: 1
Dataflow Streaming Pipeline:
Some Source Upload Compressed Files in GCS -> Upload Event (gs:///folder/file.gz) Sent to PubSub -> DataFlow Streaming Read file Event from PubSub I/O -> DoFn Un-Gzip
static class CustomDoFn extends DoFn<String, String>{
@Override
public void processElement(ProcessContext c) throws Exception {
String gcsPath = c.element();
Open ReadChannel with GCS
Get Stream from Channel
while((line = stream.ReadLine()) != null){
c.output(line) // Is this good way to read and send line down the pipeline?
}
}
// to-be Pipeline
pipeline.apply(PubSubIO.Read()).
apply(ParDO.of(new CustomDoFn())).
apply(new CustomTX()).
apply(BigQueryIO.Write());
Doubts are:
1. Is it correct method to produce output in loop in DoFn?
2. How I can use make use of FileBasedSource.FileBasedReader inside Dofn ?
Upvotes: 0
Views: 1395
Reputation: 346
Currently there is no way to use a FileBasedSource with dynamic filenames (i.e. file names not specified at pipeline construction). A future improvement to Apache Beam 2.0 (https://issues.apache.org/jira/browse/BEAM-65) will enable this feature but is not yet ready for use. Your outlined approach will, as Alex Amato pointed out, be memory-constrained for large files, but should otherwise produce a functioning pipeline.
Upvotes: 0