Reputation: 610
Context: I am using datafusion to build a data validator for a csv file input.
Requirement: I want to add row number where the error occurred in output report. In pandas, I have ability to add row index which can be used for this purpose. Is there a way to achieve similar result in datafusion.
Upvotes: 3
Views: 852
Reputation: 444
Using LazyFrame's fn with_row_index()
.
Cargo.toml
[package]
name = "add_index"
version = "0.1.0"
edition = "2024"
[dependencies.polars]
version = "0.46.0"
features = [
"csv",
"lazy", # Lazy API
]
main.rs
use polars::prelude::*;
use std::{error::Error, io::Cursor};
fn main() -> Result<(), Box<dyn Error>> {
// use an in memory repr for the csv
let csv = Cursor::new(
"a,b
1,2
1,3
4,2
2,6
3,7
",
);
// parse the csv into a DataFrame
let df_1 = CsvReader::new(csv).finish()?;
// print df_1
dbg!(&df_1);
let df_2 = df_1
.lazy()
.with_row_index("idx", Some(0u32))
.filter(col("b").is_not_null())
.collect()?;
// print df_2
dbg!(&df_2);
Ok(())
}
Output after cargo run
:
[src/main.rs:20:5] &df_1 = shape: (6, 2)
┌───────┬──────┐
│ a ┆ b │
│ --- ┆ --- │
│ str ┆ i64 │
╞═══════╪══════╡
│ 1 ┆ 2 │
│ 1 ┆ 3 │
│ 4 ┆ 2 │
│ 2 ┆ 6 │
│ 3 ┆ 7 │
│ ┆ null │
└───────┴──────┘
[src/main.rs:29:5] &df_2 = shape: (5, 3)
┌─────┬───────┬─────┐
│ idx ┆ a ┆ b │
│ --- ┆ --- ┆ --- │
│ u32 ┆ str ┆ i64 │
╞═════╪═══════╪═════╡
│ 0 ┆ 1 ┆ 2 │
│ 1 ┆ 1 ┆ 3 │
│ 2 ┆ 4 ┆ 2 │
│ 3 ┆ 2 ┆ 6 │
│ 4 ┆ 3 ┆ 7 │
└─────┴───────┴─────┘
Upvotes: 0
Reputation: 14730
Given Ian Grahams advice to checkout polars, I thought I give an example on how this could be achieved in polars as well:
use polars::prelude::*;
use std::io::Cursor;
fn main() -> Result<()> {
// use an in memory repr for the csv
let csv = Cursor::new(
"a,b
1,2
1,3
4,2
2,6
3,7
",
);
// parse the csv into a DataFrame
let mut df = CsvReader::new(csv).finish()?;
// create the index column based on the dataframes height
// note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
idx.rename("idx");
// add the index column to the DataFrame
df.insert_at_idx(0, idx.into_inner().into_series())?;
// print output
dbg!(df);
Ok(())
}
Outputs:
+-----+-----+-----+
| idx | a | b |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0 | 1 | 2 |
+-----+-----+-----+
| 1 | 1 | 3 |
+-----+-----+-----+
| 2 | 4 | 2 |
+-----+-----+-----+
| 3 | 2 | 6 |
+-----+-----+-----+
| 4 | 3 | 7 |
+-----+-----+-----+
Upvotes: 2
Reputation: 376
There doesn't appear to be any easy way to do this within datafusion after opening the CSV file. But you could instead open the CSV file directly with arrow, produce a new RecordBatch that incorporates the index column, and then feed this to datafusion using a MemTable. Here's the example assuming we are only processing one batch ...
use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;
use std::fs::File;
use std::sync::Arc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let file = File::open("tests/example.csv")?;
let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
let batch = csv.next().unwrap()?;
let length = batch.num_rows() as u32;
let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
let new_schema = Schema::new(vec![
Field::new("idx", DataType::UInt32, true),
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]);
let new_batch = RecordBatch::try_new(Arc::new(new_schema),
vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
let mut ctx = ExecutionContext::new();
// create the dataframe
let df = ctx.read_table(Arc::new(mem_table))?;
let results = df.collect().await?;
print_batches(&results).unwrap();
// do whatever you need to do
// do whatever you need to do
// do whatever you need to do
Ok(())
}
My example.csv looks like this ...
a,b
1,2
1,3
4,2
2,6
3,7
And the output should be ...
+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0 | 1 | 2 |
| 1 | 1 | 3 |
| 2 | 4 | 2 |
| 3 | 2 | 6 |
| 4 | 3 | 7 |
+-----+---+---+
Though if you're really just in search of a crate with functionality like pandas in python, I'd urge you to checkout polars.
Upvotes: 2