Andy F
Andy F

Reputation: 31

Is there any way to stream to a parquet file in Ruby?

I am trying to create an archival tool for a Ruby On Rails app.

To this end, I wish to store the data in parquet files, ideally with one parquet file per table per time interval.

However, I do not have the resources for all of my tables to have the entire time interval's data in memory at once. I was hoping there would be some way to stream the data in batches to a parquet file by maintaining a writer to the parquet file, only closing it once all the data for the time interval had been written.

I am using red-parquet and red-arrow gems currently, and unfortunately have been unable to figure out how to do so. If anyone has any ideas or solutions it'd be appreciated.

I have tried to look at the documentation, as well as the code provided in the Apache Arrow Github. In multiple tests a 'Parquet::ArrowFileWriter' is opened and used, but I cannot find any documentation indicating how to use it. And when I try to use it in my own project it appears Parquet::ArrowFileWriter doesnt exist.

I am using the latest versions of the red-arrow and red-parquet gems.

Upvotes: 3

Views: 598

Answers (1)

amoeba
amoeba

Reputation: 4280

I think you're very close to having all the pieces needed for a solution. Here's an example that shows how to write out a batch (Arrow RecordBatch) at a time to a Parquet file (opened via Parquet::ArrowFileWriter):

require "parquet"

fields = [
  Arrow::Field.new("int32",  :int32),
]
schema = Arrow::Schema.new(fields)

# Create a RecordBatch with just four values.
# Note: Replace with your data-generating routine
#
# Returns an Arrow::RecordBatch
def make_batch(schema)
  columns = [
    Arrow::Int32Array.new([1, -2, 4, -8]),
  ]

  Arrow::RecordBatch.new(schema, columns[0].length, columns)
end

# Open a file to write and write out two Record Batches as a Parquet Row Groups
writer = Parquet::ArrowFileWriter.open(schema, "streamed.parquet")

# Write out the first row group
batch = make_batch(schema)
writer.write_table(batch.to_table, batch.length)

# Write out the second row group
batch = make_batch(schema)
writer.write_table(batch.to_table, batch.length)

# Clean up
writer.close

Depending on your situation, you probably want to use a try/catch/finally to ensure the writer gets cleaned up.

Upvotes: 0

Related Questions