Reputation: 1822
I am on Spark 3.0.0-preview and trying to save a dataset to the PostgreSQL database. Following are the steps that I am following:
Actual: Only rows from table_a get updated in the DB. Expected: A union of record of table a and step 3 should get updated on the database. Analysis: If I use mode as 'Append' the records count is correct but I am looking for truncating the table and not appending.
Code:
val spark = SparkSession.builder.master("local[*]").appName("Testing")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val tableA = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from (select uid, employer_key, name , row_number() over(partition by employer_key order by updated_at desc) as rn from test.table_a) t where t.rn = 1")
.load()
val tableB = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load()
val nonUpdatedDFRows = tableB.join(tableA, tableB("employer_key") === tableA("employer_key"), "leftanti")
nonUpdatedDFRows.show(5) //Working correctly
val refreshDF = nonUpdatedDFRows.unionByName(tableA)
refreshDF.show(5) //Working correctly
refreshDF.write.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("dbtable", "test.table_b")
.option("truncate", "true").mode("overwrite")
.save();
//only rows from table_a get updated in the DB but if I change the mode to Append, it will work fine.
Upvotes: 0
Views: 1089
Reputation: 1822
The problem in my code was that I was trying to overwrite the same table from which I was reading.
To solve the problem I have to first cache the value, as below:
val tableB = spark.read.format("jdbc").option("url",
"jdbc:postgresql://localhost:5432/test")
.option("user", "sample")
.option("password", "sample")
.option("query", "select t.uid, t.employer_key, t.name from test.table_b t")
.load().cahce()
Do some operation to enforce spark to load data, as below:
tableB.show(2)
Upvotes: 2