JDev
JDev

Reputation: 1822

spark-rdbms : Overwrite mode is working different from Append

I am on Spark 3.0.0-preview and trying to save a dataset to the PostgreSQL database. Following are the steps that I am following:

  1. Get the data from table A
  2. Get the data from table B (same structure as A)
  3. Do a left anti join b/w Table A and B. This is done to get the rows from Table B which are not in Table A
  4. Join Table A with the outcome from Step 3. This is done to get unique rows from table A and B.
  5. Save the result with Override mode to Table B

Actual: Only rows from table_a get updated in the DB. Expected: A union of record of table a and step 3 should get updated on the database. Analysis: If I use mode as 'Append' the records count is correct but I am looking for truncating the table and not appending.

Code:

val spark = SparkSession.builder.master("local[*]").appName("Testing")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

val tableA = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from (select uid, employer_key, name , row_number() over(partition by employer_key order by updated_at desc) as rn from test.table_a) t where t.rn = 1")
.load()

 val tableB = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load()

val nonUpdatedDFRows = tableB.join(tableA, tableB("employer_key") === tableA("employer_key"), "leftanti")

nonUpdatedDFRows.show(5) //Working correctly

val refreshDF = nonUpdatedDFRows.unionByName(tableA)

refreshDF.show(5) //Working correctly

refreshDF.write.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("dbtable", "test.table_b")
.option("truncate", "true").mode("overwrite")
.save();

//only rows from table_a get updated in the DB but if I change the mode to Append, it will work fine.

Upvotes: 0

Views: 1089

Answers (1)

JDev
JDev

Reputation: 1822

The problem in my code was that I was trying to overwrite the same table from which I was reading.

To solve the problem I have to first cache the value, as below:

val tableB = spark.read.format("jdbc").option("url", 
"jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load().cahce()

Do some operation to enforce spark to load data, as below:

tableB.show(2)

Upvotes: 2

Related Questions