Reputation: 61
I am trying to insert spark DF to Postgres using JDBC write. The postgres table has a unique constraint on one of the columns, when the df to be inserted violates the constraint entire batch is rejected and spark session closes giving an error duplicate key value violates unique constraint which is correct as the data is duplicate (already exists in the database) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148
What is needed that the data rows which do not violate the constraint be inserted and the failed row be ignored, without failing the entire batch.
The code used is:
mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"}
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)
How can I do this?
Upvotes: 2
Views: 3679
Reputation: 538
Unfortunately, there is no out of the box solution by Spark. There is a number of possible solutions I see:
Implement business logic of conflict resolution in PostgreSQL database as part of the forEachPartition function. For example, catch the exception of the constraint violation then report to the log.
Drop the constraint on PostgreSQL database, use autogenerated PK means enable to store duplicated rows in the database. Deduplication logic may be further implemented as a part of each SQL query or running deduplication on a daily/hourly basis. You can see example here.
In case there is no other system or process writing to PostgreSQL table except your Spark job it is possible to do filter using the join operation to remove all existing rows from Spark Dataframe before spark.write something like this
I hope my ideas would be helpful.
Upvotes: 1
Reputation: 18098
That is not possible if you have a unique constraint on the target. There is no UPSert mode currently with these techniques. You need to design around this aspect.
Upvotes: 0