Reputation: 358
I have a bronze level delta lake table(events_bronze) at location "/mnt/events-bronze" to which data is streamed from kafka. Now I want to be able to stream from this table and update using "foreachBatch" into a silver table(events_silver". This can be achieved using bronze table as a source. However, during the initial run since events_silver doesn't exist, I keep getting error saying Delta table doesn't exist which is obvious. So how do I go about creating events_silver which has the same structure as events_bronze? I couldn't find a DDL to do the same.
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
.merge(
microBatchOutputDF.as("bronze"),
"silver.id=bronze.id")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
events_bronze
.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
During initial run, the problem is that there is no delta lake table defined for path "/mnt/events-silver". I'm not sure how to create it having the same structure as "/mnt/events-bronze" for the first run.
Upvotes: 3
Views: 12881
Reputation: 1702
Here's a pyspark example:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from delta.tables import DeltaTable
basePath = 'abfss://stage2@your_storage_account_name.dfs.core.windows.net'
schema = StructType([StructField('SignalType', StringType()),StructField('StartTime', TimestampType())])
if not DeltaTable.isDeltaTable(spark, basePath + '/tutorial_01/test1'):
emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
emptyDF.write.format('delta').mode('overwrite').save(basePath + '/tutorial_01/test1')
and here's an updated pyspark example, using the newer createIfNotExists
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from delta.tables import DeltaTable
schema = StructType([StructField('SignalType', StringType()),StructField('StartTime', TimestampType())])
DeltaTable.createIfNotExists(spark).location('abfss://stage2@your_storage_account_name.dfs.core.windows.net/tutorial_01/test1').addColumns(schema).execute()
Upvotes: 2
Reputation: 461
As of release 1.0.0 of Delta Lake, the method DeltaTable.createIfNotExists() was added (Evolving API).
In your example DeltaTable.forPath(spark, "/mnt/events-silver")
can be replaced with:
DeltaTable.createIfNotExists(spark)
.location("/mnt/events-silver")
.addColumns(microBatchOutputDF.schema)
.execute
You have to be careful not to supply an .option("checkpointLocation", "/mnt/events-silver/_checkpoint")
where the checkpointLocation is a subdirectory within your DeltaTable's location. This will cause the _checkpoint directory to be created before the DeltaTable and an exception will be thrown when trying to create the DeltaTable.
Upvotes: 7
Reputation: 735
Before starting stream write/merge, check whether table is already exists. If not create one using empty dataframe & schema (of events_bronze)
val exists = DeltaTable.isDeltaTable("/mnt/events-silver")
if (!exists) {
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], <schema of events_bronze>)
emptyDF
.write
.format("delta")
.mode(SaveMode.Overwrite)
.save("/mnt/events-silver")
}
Table(delta lake metadata) will get created only one time at the start and if it doesn't exist. In case of job restart and all, it will be present & skip table creation
Upvotes: 7
Reputation: 1157
You can check the table using spark SQL. First run below on spark SQL, which will give table definition of bronze table :
spark.sql("show create table event_bronze").show
After getting the DDL just change the location to silver table's path and run that statement is spark SQL.
Note: Use "create table if not exists..." as it will not fail in concurrent runs.
Upvotes: 0