user413787
user413787

Reputation: 1

dataflow read files in DoFn with output as stream of lines

Dataflow Streaming Pipeline:

Some Source Upload Compressed Files in GCS -> Upload Event (gs:///folder/file.gz) Sent to PubSub -> DataFlow Streaming Read file Event from PubSub I/O -> DoFn Un-Gzip

static class CustomDoFn extends DoFn<String, String>{

@Override
public void processElement(ProcessContext c) throws Exception {
    String gcsPath = c.element();
    Open ReadChannel with GCS
    Get Stream from Channel
    while((line = stream.ReadLine()) != null){
        c.output(line) // Is this good way to read and send line down the pipeline?
    }
}

// to-be Pipeline

 pipeline.apply(PubSubIO.Read()).
           apply(ParDO.of(new CustomDoFn())).
           apply(new CustomTX()).
           apply(BigQueryIO.Write());

Doubts are:
1. Is it correct method to produce output in loop in DoFn?
2. How I can use make use of FileBasedSource.FileBasedReader inside Dofn ?

Upvotes: 0

Views: 1395

Answers (1)

Charles Chen
Charles Chen

Reputation: 346

Currently there is no way to use a FileBasedSource with dynamic filenames (i.e. file names not specified at pipeline construction). A future improvement to Apache Beam 2.0 (https://issues.apache.org/jira/browse/BEAM-65) will enable this feature but is not yet ready for use. Your outlined approach will, as Alex Amato pointed out, be memory-constrained for large files, but should otherwise produce a functioning pipeline.

Upvotes: 0

Related Questions