b-j
b-j

Reputation: 11

delta mergeSchema doesn't work using MemoryStream with spark checkpoint

I am testing a DeltaWriter class using MemoryStream by spark for creating a stream (rather than readStream) and i want to write the result on s3 as delta file with option "mergeSchema": true as reported below:

import org.apache.spark.sql.execution.streaming.MemoryStream
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import io.delta.implicits._

case class EnrichedPerson(id: Int, name: String, age: Int, address: String)
case class BasePerson(id: Int, name: String, age: Int)

val data1: List[BasePerson] = List(BasePerson(1, "mark ", 30), BasePerson(2, "paul", 25))
implicit val encoder1: Encoder[BasePerson] = Encoders.product[BasePerson]
val memStream1 = MemoryStream[BasePerson]
memStream1.addData(data1)
val stream1 = memStream1.toDS().toDF()
val streamQuery1 = stream1
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")
streamQuery1.processAllAvailable()

val data2: List[EnrichedPerson] = List(EnrichedPerson(11, "jhon", 31, "street 1"), EnrichedPerson(22, "luis", 32, "street 2"))
implicit val encoder2: Encoder[EnrichedPerson] = Encoders.product[EnrichedPerson]
val memStream2 = MemoryStream[EnrichedPerson]
val stream2 = memStream2.toDS().toDF()
memStream2.addData(data2)

val streamQuery2 = stream2
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .outputMode("append")
  .option("checkpointLocation", "my/checkpointpoint/location1")
  .delta("s3://bucket/raw/data/..-")

streamQuery2.processAllAvailable()

The first time the codes works well (streamQuery1) with certain schema of input(e.g. col(A), col(B), , col(C)). If i try to change the schema by adding a new column (e.g. col(A), col(B), col(C), col(D)), the same code (streamQuery2) doesn't update the delta table with the new column C, even though i am using the delta as sink with mergeSchema enabled. I'm not got any error, however the streamQuery2 doesn't write any data.

According the spark docs Recovering from Failures with Checkpointing, i could change the schema between 2 execution, because my sink(delta) allow the schema changes

Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream is allowed only if the output sink allows the schema change from "a" to "b".

The output produced to s3 by the first execution from the above query is:

deltaTable/
|
|__checkpoint/
|   |__commits/
|   |   |__0
|   |__offsets/
|   |   |__0
|   |__metadata
|
|__delta/
   |__....

Inspecting the content of the checkpoint folder, i found out that there is no file with metadata regarding the schema of my data. Indeed, the content are:

cat deltaTable/checkpoint/metadata

{"id":"b48487ca-5374-4b93-8e26-503184f2ed57"}

cat deltaTable/checkpoint/commits/0

v1 {"nextBatchWatermarkMs":0}

cat deltaTable/checkpoint/offsets/0

v1 {"batchWatermarkMs":0,"batchTimestampMs":1649859656284,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}} 0

In order to solve this issue, as reported in these link1 link2, is sufficient to delete the checkpoint however the impact of this solution is strong because after deleted the checkpoint, how can i start from the same offset?

Can anyone explain to me why deleting the checkpoint the previous query work, even though there are no metadata about the schema into the checkpoint folder and how can i use both checkpoint and mergeSchema option in order to realize the schema evolution test?

Thanks in advance!

Upvotes: 1

Views: 694

Answers (1)

Alex Ott
Alex Ott

Reputation: 87259

Schema of the table is recorded in the delta log, not in the checkpoint. You need to check JSON files under the _delta_log director of your table (for example /user/hive/warehouse/table_name).

Upvotes: 0

Related Questions