Adrian David Smith
Adrian David Smith

Reputation: 598

Delta Standalone - Scan for Specific Data

I'm using the Delta Standalone library to read data from a Delta Table. My goal is to assert that specific data was processed and persisted by an upstream function/service.

I am able to do this by naively retrieving all files as shown by the function below:

fun retrieveRecords(): List<RowRecord> {
    val log = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
    val snapshot = log.snapshot()
    val iter = snapshot.open()
    val rows = mutableListOf<RowRecord>()
    while (iter.hasNext()) {
        rows.add(iter.next())
    }
    return rows
}

This, however, does not scale. I've seen in the Delta Standalone documentation that the scan() operation looks promising, stating that is can be used to do the following:

Access the files that match the partition filter portion of the readPredicate with DeltaScan::getFiles. This returns a memory-optimized iterator over the metadata files in the table.

To further filter the returned files on non-partition columns, get the portion of input predicate not applied with DeltaScan::getResidualPredicate.

From my understanding, I would be able to use this scan() operation, pass the partitioned columns and access the files (essentially the path) attribute and do some conversions from there.

I'm struggling to:

  1. Pass the partitions as the required Expression type in the scan() operation; and
  2. Find a way to then convert the retrieved filepath's into RowRecords to do the assertions.
val iter = log.snapshot().scan(
    EqualTo(
        Column("partitioned_col_1", StringType()),
        Literal.of("partition_val_1"),
    )
    // Scan expects only a single Expression
    // Multiple Expressions are not allowed
    // ,
    // EqualTo(
    //    Column("partitioned_col_2", StringType()),
    //    Literal.of("partition_val_2"),
    // ),
    // EqualTo(
    //    Column("partitioned_col_3", StringType()),
    //    Literal.of("partition_val_3"),
    // )
).files

val paths = mutableListOf<String>()
while (iter.hasNext()) {
    paths.add(iter.next().path)
}

// for path in paths:
//     read and convert into RowRecord

I seem to be missing a few key pieces of information here. Any advice would be greatly appreciated.

Upvotes: 1

Views: 634

Answers (1)

deo
deo

Reputation: 936

You have to use CloseableParquetDataIterator to iterate over scanned file. It is the internal package. so you can't access outside. Here is a hack to access it, tested.

import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;
import io.delta.standalone.internal.util.{ConversionUtils, FileNames, JsonUtils}
import io.delta.standalone.internal.data.CloseableParquetDataIterator
    val deltaLog = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
    val snapshot = deltaLog.snapshot()
import scala.collection.JavaConverters;


var allFiles = snapshot.scan(new EqualTo(snapshot.getMetadata().getSchema.column("partitioned_col_1"), Literal.of(partition_val_1))).getFiles

import scala.collection.JavaConverters._;
    
var rowIterator = Class.forName("io.delta.standalone.internal.data.CloseableParquetDataIterator").getConstructor(classOf[Seq[(String, Map[String, String])]], snapshot.getMetadata.getSchema.getClass, classOf[java.util.TimeZone], classOf[org.apache.hadoop.conf.Configuration]).newInstance(JavaConverters.asScalaIteratorConverter(allFiles).asScala.toSeq.map {add => (DELTA_TABLE_LOCATION + "/" + add.getPath , add.getPartitionValues.asScala.toMap)}, snapshot.getMetadata.getSchema, null, new org.apache.hadoop.conf.Configuration()).asInstanceOf[CloseableIterator[io.delta.standalone.data.RowRecord]]
// your result here    
rowIterator.next

Upvotes: 0

Related Questions