igrigorik
igrigorik

Reputation: 9601

How to process a GCS filepattern, full file at a time?

I need to process a (GCS) bucket of files, where each file is compressed and contains a single multi-line JSON record. Also, the name of the file being processed is significant and I need to know it within my transform.

Starting with examples in the docs, TextIO looks pretty close, but it looks like its designed to process each file line-by-line and does not allow me to read the entire file at once. Also, I don't see any way to get the filename that's being processed?

PCollectionTuple results = p.apply(TextIO.Read
    .from("gs://bucket/a/*.gz")
    .withCompressionType(TextIO.CompressionType.GZIP)
    .withCoder(MyJsonCoder.of()))

Looks like I need to write a custom IO reader, or some such? Any tips for best place to start?

Upvotes: 2

Views: 464

Answers (1)

jkff
jkff

Reputation: 17913

You are correct that right now none of the existing classes do exactly what you want. There are 2 reasonable approaches:

  • Match the filepattern yourself (using IOChannelUtils and IOChannelFactory) and wrap the resulting files into a PCollection<String> where the String will be a filename, using Create.of(filenames). Then apply a ParDo with a function which reads the given filename.
  • Write your own subclass of Source (there's also FileBasedSource, but it's not quite right for your use case). It would be configured by the filepattern, and splitIntoBundles would match the filepattern and expand into individual sources each corresponding to one file.

I would recommend the first approach because it seems like less code and your use case does not require the full power of Source.

Upvotes: 4

Related Questions