Reputation: 1009
I have a general question about Spark.
Should Pyspark and Scala Spark always have the same behaviour when we use the exact same code ?
If yes, how can ou explain this example:
Scala version:
val inputDf = spark
.readStream
.format("csv")
.schema(schema)
.option("ignoreChanges", "true")
.option("delimiter", ";").option("header", true)
.load("/input/")
def processIsmedia(df: DataFrame, batchId: Long): Unit = {
val ids = df
.select("id").distinct().collect().toList
.map(el => s"$el")
ids.foreach { id =>
val datedDf = df.filter(col("id") === id)
datedDf
.write
.format("delta")
.option("mergeSchema", "true")
.partitionBy("id")
.option("replaceWhere", s"id == '$id'")
.mode("overwrite")
.save("/res/")
}
}
inputDf
.writeStream
.format("delta")
.foreachBatch(processIsmedia _)
.queryName("tgte")
.option("checkpointLocation", "/check")
.trigger(Trigger.Once)
.start()
Python version:
inputDf = spark \
.readStream \
.format("csv") \
.schema(schema) \
.option("ignoreChanges", "true") \
.option("delimiter", ";").option("header", True) \
.load("/in/") \
def processDf(df, epoch_id):
PartitionKey = "id"
df.cache()
ids=[x.id for x in df.select("id").distinct().collect()]
for idd in ids:
idd =str(idd)
tmp = df.filter(df.id == idd)
tmp.write.format("delta").option("mergeSchema", "true").partitionBy(PartitionKey).option("replaceWhere", "id == '$i'".format(i=idd)).save("/res/")
inputDf.writeStream.format("delta").foreachBatch(processDf).queryName("aaaa").option("checkpointLocation", "/check").trigger(once=True).start()
Both codes are exactly equivalent. They are supposed to write data (append new partitions and overwrite existant ones).
With Scala it is working perfectly fine. With Python I am having an error :
Data written out does not match replaceWhere 'id == '$i''.
So my question is: Isnt spark the same thing whether it is used with Scala, Java, Python or even R ? How can this error be possible then ?
Upvotes: 0
Views: 145
Reputation: 10035
The python code is not performing a replace for the value in idd
and the resulting string is "id == '$i'"
which is not the case in your scala code i.e.
.option("replaceWhere", "id == '$i'".format(i=idd))
should be
.option("replaceWhere", "id == '{i}'".format(i=idd))
Let me know if this change works for you.
Upvotes: 1