Reputation: 21
I have a PCollection of KV where key is filename and value is some additional info of the files (e.g., the "Source" systems that generated the files). E.g.,
KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir2/Y1.dat", "SourceY")
I need to read all lines from the files and with the "Source" field, returning as a KV PCollection.
KV(line1 from X1.dat, "SourceX")
KV(line2 from X1.dat, "SourceX")
...
KV(line1 from Y1.dat, "SourceY")
I was able to achieve this by calling FileIO.match() and followed by a DoFn in which I sequentially read the file and append the SourceX (retrieved from a map passed in SideInput).
To get the benefit of parallel reading, could I use TextIO.readAll() to achieve this? TextIO.read() returns a PCollection, without filename info. How can I join it back the map of Filename to Source mapping? Tried WithKeys transfer, but not working ...
Upvotes: 0
Views: 323
Reputation: 5104
Currently using FileIO.match()
as you are doing is the best way to accomplish this, but once https://github.com/apache/beam/pull/12645 is merged you'll be able to use the new ContextualTextIO
transforms.
Note that computing line numbers in a distributed manner is inherently expensive; you might want to see if you can use offsets (much esasier to compute, and ordered the same as line numbers) instead.
Upvotes: 1
Reputation: 11031
If I understand correctly, you want to read the file in parallel? Unfortunately, TextIO.readAll
does not have this feature. You will have to use FileIO.match
, and then write your DoFn
to read the file in the custom way that you want.
This is because you will not be able to do a random seek into a file and preserve the count of line numbers.
Is reading files serially a bottleneck for your pipeline?
Upvotes: 0