Reputation: 1072
I have a simple Apache Beam pipeline which reads compressed bz2 files and writes them out to text files.
import apache_beam as beam
p1 = beam.Pipeline()
(p1
| 'read' >> beam.io.ReadFromText('bad_file.bz2')
| 'write' >> beam.io.WriteToText('file_out.txt')
)
p1.run()
The problem is when the pipeline encounters a bad file (example). In this case, most of my bad files are malformed, not in bz2 format or simply empty, which confuses the decompressor, causing an OSError: Invalid data stream
.
How can I tell ReadFromText to pass
on these?
Upvotes: 0
Views: 452
Reputation: 5104
You may want to filter your files and then use apache_beam.io.textio.ReadAllFromText.
For example
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.io import filesystem
from apache_beam.io import filesystems
import bz2
def is_valid_bz2(path):
try:
with filesystems.open(path, filesystem.CompressionTypes.BZ2) as handle:
handle.peek()
return True
except Exception:
return False
with beam.Pipeline() as p:
(p
| 'match' >> fileio.MatchFiles("/path/to/*.bz2")
| 'filter' >> beam.Filter(lambda m: is_valid_bz2(m.path))
| 'read' >> beam.io.textio.ReadAllFromText()
| 'write' >> beam.io.WriteToText('file_out.txt'))
Here is_valid_bz2
yses the filesystems utilities to be able to read from all supported filesystems.
Upvotes: 2