Reputation: 806
I am attempting to read data from a large (300GB) newline-delimited JSON file, extract specific fields of interest and write them to a parquet file. This question is a follow-up on my previous SO question which has more background and the structure of the data I'm working with.
Each line/JSON object is independent of the others, so I would have imagined this could be handled in a streaming fashion, processing the file (which is too large to fit in memory) in chunks.
The code that does the actual scan, collect and write is very simple:
# define the schema...
pl.scan_ndjson(
'data/input/myjson.jsonl',
schema=prschema)\
.collect(streaming=True)\
.write_parquet('data/output/myparquet.parquet',
compression='snappy',
use_pyarrow=True
)
However, as I work with increasingly larger subsets of my final file, I see that memory consumption increases linearly with input file size.
If I check the explain plan using explain(streaming=True)
I see that streaming is NOT being used:
Anonymous SCAN
PROJECT */6 COLUMNS
So my question is, why does streaming not appear to work for this seemingly straightforward read/write use case?
UPDATED
Using sink_parquet
instead of write_parquet
does not work (in fact it's what I had originally tried). To be sure that it wasn't due to the complex nature of my JSON files, I even tried a very simplified version that just attempts to write two scalar (no nested objects) fields:
pl.scan_ndjson('data/input/myjson.jsonl')\
.select('id', 'standing')\
.sink_parquet(
'data/output/myparquet.parquet',
compression='snappy'
)
This throws InvalidOperationError: sink_Parquet(ParquetWriteOptions { compression: Snappy, statistics: false, row_group_size: None, data_pagesize_limit: None, maintain_order: true }) not yet supported in standard engine. Use 'collect().write_parquet()'
Upvotes: 1
Views: 719
Reputation: 13867
Streaming only means that the input is pulled in batches. This is useful if you're doing something like a filter or aggregation, where it's possible to compute results without having all of the data in memory ahead of time. But the result of collecting is still a DataFrame that resides entirely in memory.
I believe what you want is sink_parquet
, which is the lazy version of write_parquet
.
pl.scan_ndjson(...).sink_parquet(...)
Upvotes: 1