tsorn
tsorn

Reputation: 3625

Writing a Vec of Rows to a Parquet file

I know how to read a Parquet file into a Vec<Row>.

extern crate parquet;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs, sync::Arc};
use parquet::column::writer::ColumnWriter;
use parquet::{
    file::{
        properties::WriterProperties,
        writer::{FileWriter, SerializedFileWriter},
    },
    schema::parser::parse_message_type,
    schema::types::TypePtr
};
use parquet::record::Row;
use parquet::record::RowAccessor;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::path::PathBuf;

fn read_parquet(in_path: &Path) -> (Vec<Row>, TypePtr) {
    // Read Parquet input file. Return a vector of rows and the Schema
    let file = File::open(in_path).unwrap();
    let reader = SerializedFileReader::new(file).unwrap();
    let row_iter = reader.get_row_iter(None).unwrap();
    let num_rows = reader.metadata().file_metadata().num_rows();
    let rows: Vec<Row> = row_iter.collect();
    println!("num rows: {}", num_rows);

    let schema = reader.metadata().file_metadata().schema_descr().root_schema_ptr();
    (rows, schema)
}

Now, how to write the same data out? I'm using the parquet crate.

fn to_parquet(data: Vec<Row>, schema: TypePtr, out_path: &Path) {
    let props = Arc::new(WriterProperties::builder().build());
    let file = fs::File::create(&out_path).unwrap();
    let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
    // Now what?
}

Upvotes: 4

Views: 4862

Answers (2)

Contango
Contango

Reputation: 80272

Polars tends to work well for writing Parquet files. See docs for Rust.

use polars::prelude::*;
use std::fs::File;

fn main() -> Result<(), PolarsError> {
    let data = vec![1, 2, 3, 4, 5]; // Vector of integers.
    let s = Series::new("data", data); // Create series from the vector.
    let mut df = DataFrame::new(vec![s])?; // Create a DataFrame from series.
    let file = File::create("example.parquet").expect("could not create file");
    ParquetWriter::new(file).finish(&mut df); // Write DataFrame to Parquet file.
    Ok(())
}

Use Rust v1.75.0 and Cargo.toml:

[dependencies]
polars = { version = "0.36.2", features = ["parquet", "polars-io"] }

Upvotes: 3

user1516867
user1516867

Reputation: 331

Here is a quick example that works:

    use std::{fs, path::Path, sync::Arc};
    use parquet::{column::writer::ColumnWriter, data_type::ByteArray, file::{
        properties::WriterProperties,
        writer::{FileWriter, SerializedFileWriter},
    }, schema::parser::parse_message_type};

    #[test]
    fn sample_test() {
        let path = Path::new("./sample.parquet");

        let message_type = "
            message schema {
                REQUIRED INT32 b;
                REQUIRED BINARY msg (UTF8);
            }
        ";
        let schema = Arc::new(parse_message_type(message_type).unwrap());
        let props = Arc::new(WriterProperties::builder().build());
        let file = fs::File::create(&path).unwrap();

        let mut rows: i64 = 0;
        let data = vec![
            (10, "A"),
            (20, "B"),
            (30, "C"),
            (40, "D"),
        ];

        let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
        for (key, value) in data {
            let mut row_group_writer = writer.next_row_group().unwrap();
            let id_writer = row_group_writer.next_column().unwrap();
            if let Some(mut writer) = id_writer {
                match writer {
                    ColumnWriter::Int32ColumnWriter(ref mut typed) => {
                        let values = vec![key];
                        rows +=
                            typed.write_batch(&values[..], None, None).unwrap() as i64;
                    },
                    _ => {
                        unimplemented!();
                    }
                }
                row_group_writer.close_column(writer).unwrap();
            }
            let data_writer = row_group_writer.next_column().unwrap();
            if let Some(mut writer) = data_writer {
                match writer {
                    ColumnWriter::ByteArrayColumnWriter(ref mut typed) => {
                        let values = ByteArray::from(value);
                        rows += typed.write_batch(&[values], None, None).unwrap() as i64;
                    }
                    _ => {
                        unimplemented!();
                    }
                }
                row_group_writer.close_column(writer).unwrap();
            }
            writer.close_row_group(row_group_writer).unwrap();
        }
        writer.close().unwrap();

        println!("Wrote {}", rows);

        let bytes = fs::read(&path).unwrap();
        assert_eq!(&bytes[0..4], &[b'P', b'A', b'R', b'1']);
    }

The key is to use the RowGroupWriter which you get by calling writer.next_row_group().

Once the sample.parquet file is created, you can check its contents by running:

$ parquet-read ./sample.parquet
{b: 10, msg: "A"}
{b: 20, msg: "B"}
{b: 30, msg: "C"}
{b: 40, msg: "D"}

Check the arrow-rs README.md on details about the parquet-read binary, but in short you can get it as follows:

$ git clone https://github.com/apache/arrow-rs
$ cd arrow-rs/parquet
$ cargo install --path . --features cli

Upvotes: 9

Related Questions