Reputation: 349
I have csv files being pushed to Google Storage and a pubsub subscription that notifies me when they arrive. What I'm trying to accomplish is writting a beam program that will grab the JSON data from the pubsub subscription parse out the file location and then read the csv file from GS and then process those. I have a process that will process read the pubsub and then process it into a pcollection. So far I have this:
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
final String output = options.getOutput();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.readStrings().fromSubscription(StaticValueProvider.of("beamsub")));
PCollection<String> files = input.apply(ParDo.of(new ParseOutGSFiles()));
now i need to do something like this:
pipeline.apply("ReadLines", TextIO.read().from(FILEsFROMEARLIER).withCompressionType(TextIO.CompressionType.GZIP))
any ideas or is this not possible...it seems like it should be easy
Thanks in advance
Upvotes: 0
Views: 859
Reputation: 511
The natural way to express your read would be by using TextIO.readAll() method, which reads text files from an input PCollection of file names. This method has been introduced within the Beam codebase, but is not currently in a released version. It will be included in the Beam 2.2.0 release and the corresponding Dataflow 2.2.0 release.
Your result code would look something like
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation().as(Options.class);
final String output = options.getOutput();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> files = pipeline
.apply(PubsubIO.readStrings().fromSubscription("beamsub"))
.apply(ParDo.of(new ParseOutGSFiles()));
PCollection<String> contents = files
.apply(TextIO.readAll().withCompressionType(TextIO.CompressionType.GZIP));
Upvotes: 1