Reputation: 598
I'm using the Delta Standalone library to read data from a Delta Table. My goal is to assert that specific data was processed and persisted by an upstream function/service.
I am able to do this by naively retrieving all files as shown by the function below:
fun retrieveRecords(): List<RowRecord> {
val log = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
val snapshot = log.snapshot()
val iter = snapshot.open()
val rows = mutableListOf<RowRecord>()
while (iter.hasNext()) {
rows.add(iter.next())
}
return rows
}
This, however, does not scale. I've seen in the Delta Standalone documentation that the scan()
operation looks promising, stating that is can be used to do the following:
Access the files that match the partition filter portion of the readPredicate with DeltaScan::getFiles. This returns a memory-optimized iterator over the metadata files in the table.
To further filter the returned files on non-partition columns, get the portion of input predicate not applied with DeltaScan::getResidualPredicate.
From my understanding, I would be able to use this scan()
operation, pass the partitioned columns and access the files (essentially the path
) attribute and do some conversions from there.
I'm struggling to:
Expression
type in the scan()
operation; andfilepath
's into RowRecords
to do the assertions.val iter = log.snapshot().scan(
EqualTo(
Column("partitioned_col_1", StringType()),
Literal.of("partition_val_1"),
)
// Scan expects only a single Expression
// Multiple Expressions are not allowed
// ,
// EqualTo(
// Column("partitioned_col_2", StringType()),
// Literal.of("partition_val_2"),
// ),
// EqualTo(
// Column("partitioned_col_3", StringType()),
// Literal.of("partition_val_3"),
// )
).files
val paths = mutableListOf<String>()
while (iter.hasNext()) {
paths.add(iter.next().path)
}
// for path in paths:
// read and convert into RowRecord
I seem to be missing a few key pieces of information here. Any advice would be greatly appreciated.
Upvotes: 1
Views: 634
Reputation: 936
You have to use CloseableParquetDataIterator to iterate over scanned file. It is the internal package. so you can't access outside. Here is a hack to access it, tested.
import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;
import io.delta.standalone.internal.util.{ConversionUtils, FileNames, JsonUtils}
import io.delta.standalone.internal.data.CloseableParquetDataIterator
val deltaLog = DeltaLog.forTable(configuration, DELTA_TABLE_LOCATION)
val snapshot = deltaLog.snapshot()
import scala.collection.JavaConverters;
var allFiles = snapshot.scan(new EqualTo(snapshot.getMetadata().getSchema.column("partitioned_col_1"), Literal.of(partition_val_1))).getFiles
import scala.collection.JavaConverters._;
var rowIterator = Class.forName("io.delta.standalone.internal.data.CloseableParquetDataIterator").getConstructor(classOf[Seq[(String, Map[String, String])]], snapshot.getMetadata.getSchema.getClass, classOf[java.util.TimeZone], classOf[org.apache.hadoop.conf.Configuration]).newInstance(JavaConverters.asScalaIteratorConverter(allFiles).asScala.toSeq.map {add => (DELTA_TABLE_LOCATION + "/" + add.getPath , add.getPartitionValues.asScala.toMap)}, snapshot.getMetadata.getSchema, null, new org.apache.hadoop.conf.Configuration()).asInstanceOf[CloseableIterator[io.delta.standalone.data.RowRecord]]
// your result here
rowIterator.next
Upvotes: 0