Reputation: 364747
I have essentially row-oriented/streaming data (Netflow) coming into my C++ application and I want to write the data to Parquet-gzip files.
Looking at the sample reader-writer.cc program in the parquet-cpp project, it seems that I can only feed the data to parquet-cpp in a columnar way:
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
...
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
// Write the Bool column
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
...
// Write the ... column
This seems to imply that I will need to buffer NUM_ROWS_PER_ROW_GROUP rows myself, then loop over them, to transfer them to parquet-cpp one column at a time. I'm hoping there is a better way, as this seems inefficient, since the data will need to be copied twice: once into my buffers, then again when feeding the data into parquet-cpp one column at a time.
Is there a way to get each row's data into parquet-cpp without having to buffer a bunch of rows first? The Apache Arrow project (which parquet-cpp uses) has a tutorial that shows how to convert row-wise data into an Arrow table. For each row of input data, the code appends to each column builder:
for (const data_row& row : rows) {
ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost));
I would like to do something like that with parquet-cpp. Is that possible?
Upvotes: 2
Views: 5749
Reputation: 364747
I followed @xhochy's advice to use the Arrow APIs to populate an Arrow table as data arrives, and then write out the table using parquet-cpp
's WriteTable()
method. I set GZIP as the default compression, but specified SNAPPY for the second field.
#include <iostream>
#include "arrow/builder.h"
#include "arrow/table.h"
#include "arrow/io/file.h"
#include <parquet/arrow/writer.h>
#include <parquet/properties.h>
main() {
arrow::Int32Builder sip_builder(arrow::default_memory_pool());
arrow::Int32Builder dip_builder(arrow::default_memory_pool());
for(size_t i=0; i < 1000; i++) { // simulate row-oriented incoming data
sip_builder.Append(i*100);
dip_builder.Append(i*10 + i);
}
std::shared_ptr<arrow::Array> sip_array;
sip_builder.Finish(&sip_array);
std::shared_ptr<arrow::Array> dip_array;
dip_builder.Finish(&dip_array);
std::vector<std::shared_ptr<arrow::Field>> schema_definition = {
arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */),
arrow::field("dip", arrow::int32(), false)
};
auto schema = std::make_shared<arrow::Schema>(schema_definition);
std::shared_ptr<arrow::Table> arrow_table;
MakeTable(schema, {sip_array, dip_array}, &arrow_table);
std::shared_ptr<arrow::io::FileOutputStream> file_output_stream;
arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream);
parquet::WriterProperties::Builder props_builder;
props_builder.compression(parquet::Compression::GZIP);
props_builder.compression("dip", parquet::Compression::SNAPPY);
auto props = props_builder.build();
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
file_output_stream, sip_array->length(), props);
std::cout << "done" << std::endl;
}
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out
done
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet
File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 1000
Number of RowGroups: 1 <<----------
Number of Real Columns: 2
Number of Columns: 2
Number of Selected Columns: 2
Column 0: sip (INT32)
Column 1: dip (INT32)
--- Row Group 0 ---
--- Total Bytes 8425 ---
Rows: 1000---
Column 0
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 99900, Min: 0
Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 3109
Column 1
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 10989, Min: 0
Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 5316
The code above writes out one row group for the entire table/file. Depending on how many rows of data you have, this might not be ideal, as too many rows can result in a "fallback to plain encoding" (see Ryan Blue presentation, slides 31-34). To write multiple row groups per table/file, set the chunk_size
argument smaller (below I divide by 2 to get two row groups per table/file):
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
fileOutputStream, sip_array->length()/2, props);
This is still not ideal. All of the data for a file must be buffered/stored in the Arrow table before calling parquet::arrow::WriteTable()
, since that function opens and closes the file. I want to write multiple row groups per file, but I only want to buffer/store one or two row groups worth of data at a time in memory. The following code accomplishes that. It is based off of the code in parquet/arrow/writer.cc:
#include <parquet/util/memory.h>
...
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream);
std::unique_ptr<parquet::arrow::FileWriter> writer;
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(),
arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(),
&writer);
// write two row groups for the first table
writer->WriteTable(*arrow_table, sip_array->length()/2);
// ... code here would generate a new table ...
// for now, we'll just write out the same table again, to
// simulate writing more data to the same file, this
// time as one row group
writer->WriteTable(*arrow_table, sip_array->length());
writer->Close();
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 2000000
Number of RowGroups: 3 <<--------
...
--- Row Group 0 ---
--- Total Bytes 2627115 ---
Rows: 500000---
...
--- Row Group 1 ---
--- Total Bytes 2626873 ---
Rows: 500000---
...
--- Row Group 2 ---
--- Total Bytes 4176371 ---
Rows: 1000000---
Upvotes: 4
Reputation: 8836
You will never be able to have no buffering at all as we need to transform from a row-wise to a columnar representation. The best possible path at the time of writing is to construct Apache Arrow tables that are then fed into parquet-cpp
.
parquet-cpp
provides special Arrow APIs that can then directly operate on these tables, mostly without any additional data copies. You can find the API in parquet/arrow/reader.h
and parquet/arrow/writer.h
.
The optimal but yet to be implemented solution could save some bytes by doing the following:
While this optimal solution may save you some memory, there are still some steps that need to be implemented by someone (feel free to contribute them or ask for help on implementing those), you are probably good with uaing the Apache Arrow based API.
Upvotes: 6