Richie
Richie

Reputation: 4432

ON DUPLICATE KEY UPDATE while inserting from pyspark dataframe to an external database table via JDBC

Well, I'm using PySpark and I have a Spark dataframe using which I insert the data into a mysql table.

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

I want to update a column value (which is not in primary key) by the sum of its column value and a specific number.

I've tried with different modes (append, overwrite) DataFrameWriter.jdbc() function.

My question is how do we update a column value as in we do it with ON DUPLICATE KEY UPDATE in mysql, while inserting the pyspark dataframe data into a table.

Upvotes: 15

Views: 7974

Answers (2)

Napoleon Borntoparty
Napoleon Borntoparty

Reputation: 1962

This is not possible in vanilla pyspark (or Scala Spark, for that matter), as you only have 4 write modes (source https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc):

append: Append contents of this DataFrame to existing data.

overwrite: Overwrite existing data.

ignore: Silently ignore this operation if data already exists.

error or errorifexists (default case): Throw an exception if data already exists.

There is, however, a couple of hacky workarounds:

  1. There's a jython package that allows you to write jdbc queries directly, therefore you can structure your code to INSERT ... ON DUPLICATE KEY UPDATE .... Here's the link: https://pypi.org/project/JayDeBeApi/

  2. If you're competent in Scala, you can write a new mode or overwrite the org.apache.spark.sql.execution.datasources.jdbc and the JdbcUtils.scala INSERT INTO to INSERT ... ON DUPLICATE KEY UPDATE .... Or even better, using a MERGE statement such as:

MERGE INTO table-name
USING table-ref
AS name
ON cond
WHEN NOT MATCHED THEN INSERT 
WHEN MATCHED THEN UPDATE

Depending on your flavour of SQL.

  1. Use a staging table where you overwrite, then write a simple mysql trigger on this staging environment in such a way that it runs INSERT INTO target_table ON DUPLICATE KEY UPDATE.

  2. Move your Spark DataFrame to pandas DataFrame and write your upsert query there using sqlalchemy and raw queries.

  3. Create a pipeline using Spark Streaming backed by Apache Kafka, then use a tool with jdbc upsert functionality such as Kafka Connect to upsert directly into your target table. Or use Kafka Connect for upserting from staging table to target table. Here's some reading https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html#idempotent-writes

Upvotes: 4

ThatDataGuy
ThatDataGuy

Reputation: 2109

A workaround is to insert the data into a staging table, and then migrate it into the final tables using a SQL statement executed by the driver program. Than you can use any valid SQL syntax relevant to your database provider.

Upvotes: 1

Related Questions