Mikko Ohtamaa
Mikko Ohtamaa

Reputation: 83828

Incrementally writing Parquet dataset from Python

I am writing a larger than RAM data out from my Python application - basically dumping data from SQLAlchemy to Parque. My solution was inspired by this question. Even though increasing the batch size as hinted here I am facing the issues:

My assumption is that this is because the ParquetWriter metadata management becomes expensive when the number of rows increase. I am thinking that I should switch to datasets that would allow the writer to close the file in the middle of processing flush out the metadata.

My question is

My distilled code:


writer = pq.ParquetWriter(
                    fname,
                    Candle.to_pyarrow_schema(small_candles),
                    compression='snappy',
                    allow_truncated_timestamps=True,
                    version='2.0',  # Highest available schema
                    data_page_version='2.0',  # Highest available schema
            ) as writer:

    def writeout():
        nonlocal data
        duration = time.time() - stats["started"]
        throughout = stats["candles_processed"] / duration
        logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
        writer.write_table(
            pa.Table.from_pydict(
                    data,
                    writer.schema
            )
        )
        data = dict.fromkeys(data.keys(), [])
        process = psutil.Process(os.getpid())
        logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())

    # Use massive yield_per() or otherwise we are leaking memory
    for item in query.yield_per(100_000):
        frame = construct_frame(row_type, item)
        for key, value in frame.items():
            data[key].append(value)

        stats["candles_processed"] += 1

        # Do regular checkopoints to avoid out of memory
        # and to log the progress to the console
        # For fine tuning Parquet writer see
        # https://issues.apache.org/jira/browse/ARROW-10052
        if stats["candles_processed"] % 100_000 == 0:
            writeout()

Upvotes: 3

Views: 2056

Answers (1)

Mikko Ohtamaa
Mikko Ohtamaa

Reputation: 83828

In this case, the reason was the incorrect use of Python lists and dicts as a working buffer, as pointed out by @0x26res.

After making sure the dictionary of lists is cleared correctly, the memory consumption issues become negligible.

Upvotes: 3

Related Questions