Haha
Haha

Reputation: 1009

Spark Scala code not working similarly then its pyspark version

I have a general question about Spark.

Should Pyspark and Scala Spark always have the same behaviour when we use the exact same code ?

If yes, how can ou explain this example:

Scala version:

val inputDf = spark
      .readStream
      .format("csv")
      .schema(schema)
      .option("ignoreChanges", "true")
      .option("delimiter", ";").option("header", true)
      .load("/input/")


def processIsmedia(df: DataFrame, batchId: Long): Unit = {

      val ids = df
        .select("id").distinct().collect().toList
        .map(el => s"$el")


      ids.foreach { id =>
        val datedDf = df.filter(col("id") === id)

        datedDf
          .write
          .format("delta")
          .option("mergeSchema", "true")
          .partitionBy("id")
          .option("replaceWhere", s"id == '$id'")
          .mode("overwrite")
          .save("/res/")
      }
    }


    inputDf 
      .writeStream
      .format("delta")
      .foreachBatch(processIsmedia _)
      .queryName("tgte")
      .option("checkpointLocation", "/check")
      .trigger(Trigger.Once)
      .start()

Python version:

inputDf = spark \
      .readStream  \
      .format("csv") \
      .schema(schema) \
      .option("ignoreChanges", "true") \
      .option("delimiter", ";").option("header", True) \
      .load("/in/") \

def processDf(df, epoch_id):
  PartitionKey = "id"
  df.cache()
  ids=[x.id for x in df.select("id").distinct().collect()]
  for idd in ids:
    idd =str(idd)
    tmp = df.filter(df.id == idd)
    tmp.write.format("delta").option("mergeSchema", "true").partitionBy(PartitionKey).option("replaceWhere", "id == '$i'".format(i=idd)).save("/res/")
  
inputDf.writeStream.format("delta").foreachBatch(processDf).queryName("aaaa").option("checkpointLocation", "/check").trigger(once=True).start()

Both codes are exactly equivalent. They are supposed to write data (append new partitions and overwrite existant ones).

With Scala it is working perfectly fine. With Python I am having an error :

Data written out does not match replaceWhere 'id == '$i''.

So my question is: Isnt spark the same thing whether it is used with Scala, Java, Python or even R ? How can this error be possible then ?

Upvotes: 0

Views: 145

Answers (1)

ggordon
ggordon

Reputation: 10035

The python code is not performing a replace for the value in idd and the resulting string is "id == '$i'" which is not the case in your scala code i.e.

.option("replaceWhere", "id == '$i'".format(i=idd))

should be

.option("replaceWhere", "id == '{i}'".format(i=idd))

Let me know if this change works for you.

Upvotes: 1

Related Questions