Reputation: 83
I have my data in ADLS v2. One table is over 3TB and it is partitioned by event_date, event_type and sample_freq. Since data is that big, it's natural that I would want to read it by partitions and not to fecth all the data in memory.
How would I read data by partitions using Rust? So far I tried using delta-rs. I managed to get the files uri by partition:
use std::collections::HashMap;
use deltalake::delta::open_table_with_storage_options;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut azure_storage_options = HashMap::new();
azure_storage_options.insert("azure_storage_account_name".to_string(),
"name".to_string());
azure_storage_options.insert("azure_storage_account_key".to_string(),
"key".to_string());
let path = "az://name/path/to_data/";
let table = open_table_with_storage_options(
path,
azure_storage_options
).await.unwrap();
/* Get files of particular partition */
let files = table.get_file_uris_by_partitions(&[deltalake::PartitionFilter {
key: "event_date",
value: deltalake::PartitionValue::Equal("2018-01-01")
}])?;
Ok(())
}
I was also looking into datafusion documentation, but I didn't see any way to specify partitions in there as well.
Upvotes: 2
Views: 443
Reputation: 1
Just an example code snippet that demonstrates how to use these libraries to create a ParquetReader instance with credentials to access an ADLS account:
use azure_core::prelude::*;
use azure_identity::prelude::*;
use azure_storage_blob::prelude::*;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::convert::TryInto;
use std::io::Cursor;
// Set Azure ADLS storage account credentials
let account_name = "<your_account_name>";
let account_key = "<your_account_key>";
let container_name = "<your_container_name>";
let blob_name = "<your_blob_name>";
// Create a BlobServiceClient instance with Azure AD credentials
let credential = DefaultCredential::new().unwrap();
let endpoint = format!("https://{}.blob.core.windows.net", account_name);
let client = BlobServiceClient::new_client(&endpoint, credential);
// Get a reference to the Blob Container
let container_client = client.as_container_client(container_name);
// Create a ParquetReader instance
let blob_client = container_client.as_blob_client(blob_name);
let blob = blob_client.get().await.unwrap();
let mut reader = Cursor::new(blob.data);
let parquet_reader = SerializedFileReader::new(&mut reader).unwrap();
let arrow_reader = ArrowReader::new(parquet_reader);
let parquet_file_reader = ParquetFileArrowReader::new(Arc::new(arrow_reader));
Upvotes: 0
Reputation: 1
I think you can use the result of get_file_uris_by_partitions
method, which returns the URIS of the data files, to read the data from the files.
To read the data from the files, you can use the ParquetFile
and ParquetReader
structs provided by the parquet-rs
library. Here's an example:
use std::collections::HashMap;
use deltalake::delta::open_table_with_storage_options;
use parquet::file::reader::SerializedFileReader;
use parquet::record::Row;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut azure_storage_options = HashMap::new();
azure_storage_options.insert("azure_storage_account_name".to_string(),
"name".to_string());
azure_storage_options.insert("azure_storage_account_key".to_string(),
"key".to_string());
let path = "az://name/path/to_data/";
let table = open_table_with_storage_options(
path,
azure_storage_options
).await.unwrap();
/* Get files of particular partition */
let files = table.get_file_uris_by_partitions(&[deltalake::PartitionFilter {
key: "event_date",
value: deltalake::PartitionValue::Equal("2018-01-01")
}])?;
/* Read data from the files */
for file in files {
let file_reader = SerializedFileReader::new(file.uri().to_string())?;
let mut parquet_reader = ParquetReader::new(file_reader);
while let Some(record) = parquet_reader.next() {
let row = record?;
/* Process the row */
}
}
Ok(())
}
Upvotes: 0