Reputation: 4432
Well, I'm using PySpark and I have a Spark dataframe using which I insert the data into a mysql table.
url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"
df.write.jdbc(url=url, table="myTable", mode="append")
I want to update a column value (which is not in primary key) by the sum of its column value and a specific number.
I've tried with different modes (append, overwrite) DataFrameWriter.jdbc() function.
My question is how do we update a column value as in we do it with ON DUPLICATE KEY UPDATE
in mysql, while inserting the pyspark dataframe data into a table.
Upvotes: 15
Views: 7974
Reputation: 1962
This is not possible in vanilla pyspark
(or Scala Spark, for that matter), as you only have 4 write modes (source https://spark.apache.org/docs/2.4.3/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc):
append: Append contents of this DataFrame to existing data.
overwrite: Overwrite existing data.
ignore: Silently ignore this operation if data already exists.
error or errorifexists (default case): Throw an exception if data already exists.
There is, however, a couple of hacky workarounds:
There's a jython
package that allows you to write jdbc
queries directly, therefore you can structure your code to INSERT ... ON DUPLICATE KEY UPDATE ...
. Here's the link: https://pypi.org/project/JayDeBeApi/
If you're competent in Scala, you can write a new mode or overwrite the org.apache.spark.sql.execution.datasources.jdbc
and the JdbcUtils.scala INSERT INTO
to INSERT ... ON DUPLICATE KEY UPDATE ...
. Or even better, using a MERGE
statement such as:
MERGE INTO table-name
USING table-ref
AS name
ON cond
WHEN NOT MATCHED THEN INSERT
WHEN MATCHED THEN UPDATE
Depending on your flavour of SQL.
Use a staging table where you overwrite, then write a simple mysql
trigger on this staging environment in such a way that it runs INSERT INTO target_table ON DUPLICATE KEY UPDATE
.
Move your Spark DataFrame to pandas
DataFrame and write your upsert
query there using sqlalchemy
and raw queries.
Create a pipeline using Spark Streaming backed by Apache Kafka, then use a tool with jdbc upsert functionality such as Kafka Connect to upsert
directly into your target table. Or use Kafka Connect for upserting
from staging table to target table. Here's some reading https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html#idempotent-writes
Upvotes: 4
Reputation: 2109
A workaround is to insert the data into a staging table, and then migrate it into the final tables using a SQL statement executed by the driver program. Than you can use any valid SQL syntax relevant to your database provider.
Upvotes: 1