Dino Burger
Dino Burger

Reputation: 3

Use batches to add a column with pyarrow

I am currently loading a table, calculating a new column, adding the column to the table and save the table to disk, which all works fine. The question: I tried to this batch wise, but get the error message:

AttributeError: 'pyarrow.lib.RecordBatch' object has no attribute 'append_column'

Does anybody know if there is a way to do this?

Code that works, but without batches:

import pyarrow.parquet as pq
import pyarrow as pa
 
candidates = pq.ParquetFile('input.parquet').read()
result = []
for row in candidates.to_pylist():
    row_result = function(row)
    result.append(row_result)
candidates_with_result = candidates.append_column('new_column_name', pa.array(result))
pq.write_table(candidates_with_result, 'output.parquet')

The code that’s not working, but the general idea:

candidates = pq.ParquetFile('input.parquet').read()
for batch in candidates.to_batches():
    result = []
    for row in batch.to_pylist():
        row_result = function(row)
        result.append(row_result)
    batch_with_results = batch.append_column('new_column_name', pa.array(result))
    pq.write_table(batch_with_results, 'output.parquet')

So any help on how to apply a function to an arrow table batchwise is greatly appreciated!

Thanks

Dino

Upvotes: 0

Views: 1707

Answers (2)

Dino Burger
Dino Burger

Reputation: 3

Thanks to 0x26res for the answer, here I added the ParquetWriter to append correctly:

# before I know the schema, I need to calculate one batch
candidates = pq.ParquetFile('input.parquet').read()
writer = None
for batch in candidates.to_batches():
    result = []
    for row in batch.to_pylist():
        row_result = function(row)
        result.append(row_result)
    new_column = pa.array(result)
    batch_with_results = pa.RecordBatch.from_arrays(
        batch.columns + [new_column],
        schema=batch.schema.append(pa.field("new_column_name", new_column.type))
    )
    if not writer:
        writer = pq.ParquetWriter("output.parquet", batch_with_results.schema)
    writer.write_batch(batch_with_results)
writer.close()

Upvotes: 0

0x26res
0x26res

Reputation: 13902

It's not supported out of the box, but you can do something like this:

    new_column = pa.array(result)
    batch_with_results = pa.RecordBatch.from_arrays(
        batch.columns + [new_column],
        schema=batch.schema.append(pa.field("new_column_name", new_column.type))
    )

Upvotes: 2

Related Questions