alt-f4
alt-f4

Reputation: 2306

How can I chunk through a CSV using Arrow?

What I am trying to do

I am using PyArrow to read some CSVs and convert them to Parquet. Some of the files I read have plenty of columns and have a high memory footprint (enough to crash the machine running the job). I am trying to chunk through the file while reading the CSV in a similar way to how Pandas read_csv with chunksize works.

For example this is how the chunking code would work in pandas:

chunks = pandas.read_csv(data, chunksize=100, iterator=True)

# Iterate through chunks
for chunk in chunks:
    do_stuff(chunk)

I want to port a similar functionality to Arrow

What I have tried to do

I noticed that Arrow has ReadOptions which include a block_size parameter, and I thought maybe I could use it like:

# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
    input_file=input_buffer,
    read_options=arrow_csv.ReadOptions(
        use_threads=True,
        block_size=4096
    )
)

# Iterate through batches
for batch in arrow_table.to_batches():
    do_stuff(batch)

As this (block_size) does not seem to return an iterator, I am under the impression that this will still make Arrow read the entire table in memory and thus recreate my problem.

Lastly, I am aware that I can first read the csv using Pandas and chunk through it then convert to Arrow tables. But I am trying to avoid using Pandas and only use Arrow.

I am happy to provide additional information if needed

Upvotes: 9

Views: 6422

Answers (1)

Pace
Pace

Reputation: 43817

The function you are looking for is pyarrow.csv.open_csv which returns a pyarrow.csv.CSVStreamingReader. The size of the batches will be controlled by the block_size option you noticed. For a complete example:

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv

in_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/nyctaxi_2010-01.csv.gz'
out_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/temp/iterative.parquet'

convert_options = pyarrow.csv.ConvertOptions()
convert_options.column_types = {
    'rate_code': pa.utf8(),
    'store_and_fwd_flag': pa.utf8()
}

writer = None
with pyarrow.csv.open_csv(in_path, convert_options=convert_options) as reader:
    for next_chunk in reader:
        if next_chunk is None:
            break
        if writer is None:
            writer = pq.ParquetWriter(out_path, next_chunk.schema)
        next_table = pa.Table.from_batches([next_chunk])
        writer.write_table(next_table)
writer.close()

This example also highlights one of the challenges the streaming CSV reader introduces. It needs to return batches with consistent data types. However, when parsing CSV you typically need to infer the data type. In my example data the first few MB of the file have integral values for the rate_code column. Somewhere in the middle of the batch there is a non-integer value (* in this case) for that column. To work around this issue you can specify the types for columns up front as I am doing here.

Upvotes: 16

Related Questions