Reputation: 9601
I need to process a (GCS) bucket of files, where each file is compressed and contains a single multi-line JSON record. Also, the name of the file being processed is significant and I need to know it within my transform.
Starting with examples in the docs, TextIO looks pretty close, but it looks like its designed to process each file line-by-line and does not allow me to read the entire file at once. Also, I don't see any way to get the filename that's being processed?
PCollectionTuple results = p.apply(TextIO.Read
.from("gs://bucket/a/*.gz")
.withCompressionType(TextIO.CompressionType.GZIP)
.withCoder(MyJsonCoder.of()))
Looks like I need to write a custom IO reader, or some such? Any tips for best place to start?
Upvotes: 2
Views: 464
Reputation: 17913
You are correct that right now none of the existing classes do exactly what you want. There are 2 reasonable approaches:
PCollection<String>
where the String will be a filename, using Create.of(filenames)
. Then apply a ParDo
with a function which reads the given filename.splitIntoBundles
would match the filepattern and expand into individual sources each corresponding to one file.I would recommend the first approach because it seems like less code and your use case does not require the full power of Source
.
Upvotes: 4