Reputation: 3
I am currently loading a table, calculating a new column, adding the column to the table and save the table to disk, which all works fine. The question: I tried to this batch wise, but get the error message:
AttributeError: 'pyarrow.lib.RecordBatch' object has no attribute 'append_column'
Does anybody know if there is a way to do this?
Code that works, but without batches:
import pyarrow.parquet as pq
import pyarrow as pa
candidates = pq.ParquetFile('input.parquet').read()
result = []
for row in candidates.to_pylist():
row_result = function(row)
result.append(row_result)
candidates_with_result = candidates.append_column('new_column_name', pa.array(result))
pq.write_table(candidates_with_result, 'output.parquet')
The code that’s not working, but the general idea:
candidates = pq.ParquetFile('input.parquet').read()
for batch in candidates.to_batches():
result = []
for row in batch.to_pylist():
row_result = function(row)
result.append(row_result)
batch_with_results = batch.append_column('new_column_name', pa.array(result))
pq.write_table(batch_with_results, 'output.parquet')
So any help on how to apply a function to an arrow table batchwise is greatly appreciated!
Thanks
Dino
Upvotes: 0
Views: 1707
Reputation: 3
Thanks to 0x26res for the answer, here I added the ParquetWriter to append correctly:
# before I know the schema, I need to calculate one batch
candidates = pq.ParquetFile('input.parquet').read()
writer = None
for batch in candidates.to_batches():
result = []
for row in batch.to_pylist():
row_result = function(row)
result.append(row_result)
new_column = pa.array(result)
batch_with_results = pa.RecordBatch.from_arrays(
batch.columns + [new_column],
schema=batch.schema.append(pa.field("new_column_name", new_column.type))
)
if not writer:
writer = pq.ParquetWriter("output.parquet", batch_with_results.schema)
writer.write_batch(batch_with_results)
writer.close()
Upvotes: 0
Reputation: 13902
It's not supported out of the box, but you can do something like this:
new_column = pa.array(result)
batch_with_results = pa.RecordBatch.from_arrays(
batch.columns + [new_column],
schema=batch.schema.append(pa.field("new_column_name", new_column.type))
)
Upvotes: 2