Reputation: 122
I am trying to write Parquet files using dynamic destinations via the WriteToFiles
class.
I even found some further developed example like this one, where they build a custom Avro file sink.
I am currently trying to use the pyarrow
library to write a Parquet sink that could manage the write operation in a distributed way, similarly to how it is done by the WriteToParquet PTransform.
class ParquetFileSink(fileio.FileSink):
def __init__(self, schema, codec='deflate'):
self._schema = schema
self._codec = codec
self.writer = None
def open(self, fh):
# This is called on every new bundle.
self.writer = pq.ParquetWriter(
fh,
self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=False
)
def write(self, record):
# This is called on every element.
row = pa.Table.from_pandas(
pd.DataFrame(record), schema=self._schema, preserve_index=False
)
self.writer.write_table(row)
def flush(self):
pass
The main issue here is that it is not possible, as far as I know, to write unbounded PCollections as Parquet files, so if I try to use the following class to write by record either I get an error for writing on closed file handlers, or some files are simply not created.
I also tried to write batches using a GroupByKey
PTransform, however as it is not possible to close the pyarrow.parquet.ParquetWriter
object, files end up written only partially and being corrupted. Moreover this strategy is not safe as batches could be very large and to write them as a single file is not a good idea.
I can see that this problem is being faced in the class apache_beam.io.parquetio._ParquetSink
, but I don't think this can be directly applied to the WriteToFiles
class as I can't see how to fully manage file handlers with it.
Upvotes: 1
Views: 2390
Reputation: 71
I faced a similar problem and I ended up writing a ParquetSink
that could be used with WriteToFiles
. So it batches the records in memory given your configuration.
I've used this to create dynamic files in a batch process dependent on a field in the record, but I assume it would also work with a streaming pipeline, although I haven't tested it.
You can find the code in this gist
class ParquetSink(fileio.FileSink):
def __init__(self,
file_path_prefix,
schema,
row_group_buffer_size=64 * 1024 * 1024,
record_batch_size=1000,
codec='none',
use_deprecated_int96_timestamps=False,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
mime_type='application/x-parquet'):
self._inner_sink = beam.io.parquetio._create_parquet_sink(
file_path_prefix,
schema,
codec,
row_group_buffer_size,
record_batch_size,
use_deprecated_int96_timestamps,
file_name_suffix,
num_shards,
shard_name_template,
mime_type
)
self._codec = codec
self._schema = schema
self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
def open(self, fh):
self._pw = pyarrow.parquet.ParquetWriter(
fh,
self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
def write(self, record):
self._inner_sink.write_record(self._pw, record)
def flush(self):
if len(self._inner_sink._buffer[0]) > 0:
self._inner_sink._flush_buffer()
if self._inner_sink._record_batches_byte_size > 0:
self._inner_sink._write_batches(self._pw)
self._pw.close()
def parquet_compatible_filenaming(suffix=None):
def _inner(window, pane, shard_index, total_shards, compression, destination):
return fileio.destination_prefix_naming(suffix )(
window, pane, shard_index, total_shards, compression, destination).replace(":", ".")
return _inner
def get_parquet_pipeline(pipeline_options, input, output):
with beam.Pipeline(options=pipeline_options) as p:
lines = (p
| 'Read' >> beam.io.ReadFromParquet(file_pattern=input)
| 'Transform' >> beam.Map(lambda x: { 'some_key': x['some_key'], 'raw': x})
| 'Write to Parquet' >> fileio.WriteToFiles(
path=str(output),
destination=lambda x: x["some_key"],
sink=lambda x: ParquetSink(
file_path_prefix=output,
file_name_suffix=".parquet",
codec="snappy",
schema=pyarrow.schema([
pyarrow.field("some_key", pyarrow.string()),
pyarrow.field("raw", pyarrow.string())
])),
file_naming=parquet_compatible_filenaming(suffix=".parquet")
)
)
Upvotes: 3
Reputation: 13902
The parquet format is optimized for writing data in batches. Therefore it doesn't lend itself well to streaming, where you receive records one by one. In your example you're writing row by row in a parquet file, which is super unefficient.
I'd recommand saving your data in a format that lends itself well to appending data row by row, and then have a regular job that moves this data in batches to parquet files.
Or you can do like apache_beam.io.parquetio._ParquetSink
. It keeps records in memory in a buffer and write them in batch every now and then. But with this you run the risk of losing the records in the buffer if your application crashes.
Upvotes: 2