Reputation: 2306
What I am trying to do
I am using PyArrow to read some CSVs and convert them to Parquet. Some of the files I read have plenty of columns and have a high memory footprint (enough to crash the machine running the job). I am trying to chunk through the file while reading the CSV in a similar way to how Pandas read_csv with chunksize
works.
For example this is how the chunking code would work in pandas:
chunks = pandas.read_csv(data, chunksize=100, iterator=True)
# Iterate through chunks
for chunk in chunks:
do_stuff(chunk)
I want to port a similar functionality to Arrow
What I have tried to do
I noticed that Arrow has ReadOptions which include a block_size
parameter, and I thought maybe I could use it like:
# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
input_file=input_buffer,
read_options=arrow_csv.ReadOptions(
use_threads=True,
block_size=4096
)
)
# Iterate through batches
for batch in arrow_table.to_batches():
do_stuff(batch)
As this (block_size
) does not seem to return an iterator, I am under the impression that this will still make Arrow read the entire table in memory and thus recreate my problem.
Lastly, I am aware that I can first read the csv using Pandas and chunk through it then convert to Arrow tables. But I am trying to avoid using Pandas and only use Arrow.
I am happy to provide additional information if needed
Upvotes: 9
Views: 6422
Reputation: 43817
The function you are looking for is pyarrow.csv.open_csv
which returns a pyarrow.csv.CSVStreamingReader
. The size of the batches will be controlled by the block_size
option you noticed. For a complete example:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv
in_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/nyctaxi_2010-01.csv.gz'
out_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/temp/iterative.parquet'
convert_options = pyarrow.csv.ConvertOptions()
convert_options.column_types = {
'rate_code': pa.utf8(),
'store_and_fwd_flag': pa.utf8()
}
writer = None
with pyarrow.csv.open_csv(in_path, convert_options=convert_options) as reader:
for next_chunk in reader:
if next_chunk is None:
break
if writer is None:
writer = pq.ParquetWriter(out_path, next_chunk.schema)
next_table = pa.Table.from_batches([next_chunk])
writer.write_table(next_table)
writer.close()
This example also highlights one of the challenges the streaming CSV reader introduces. It needs to return batches with consistent data types. However, when parsing CSV you typically need to infer the data type. In my example data the first few MB of the file have integral values for the rate_code
column. Somewhere in the middle of the batch there is a non-integer value (*
in this case) for that column. To work around this issue you can specify the types for columns up front as I am doing here.
Upvotes: 16