Reputation: 118
I have a Spark Dataframe that I want to push to an SQL table on a remote server. The table has an Id column that is set as an identity column. The Dataframe I want to push also has as Id column, and I want to use those Ids in the SQL table, without removing the identity option for the column.
I write the dataframe like this:
df.write.format("jdbc") \
.mode(mode) \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.save()
But I get the following response:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0 (TID 41, 10.1.0.4, executor 0): java.sql.BatchUpdateException: Cannot insert explicit value for identity column in table 'Table' when IDENTITY_INSERT is set to OFF.
I have tried to add a query to the writing like:
query = f"SET IDENTITY_INSERT Table ON;"
df.write.format("jdbc") \
.mode(mode) \
.option("url", jdbc_url) \
.option("query", query) \
.option("dbtable", table_name) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.save()
But that just throws an SQL syntax error:
IllegalArgumentException: Both 'dbtable' and 'query' can not be specified at the same time.
Or if I try to run a read with the query first:
com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the keyword 'SET'.
This must be because it only supports SELECT statements.
Is it possible to do in Spark, or would I need to use a different connector and combine the setting of the insert identity on, together with regular insert into statements?
I would prefer a solution that allowed me to keep writing through the Spark context. But I am open to other solutions.
Upvotes: 4
Views: 4120
Reputation: 11
One way to work around this issue is the following:
Here's a pseudo code example:
tablename = "MyTable"
tmp_tablename = tablename+"tmp"
df.write.format("jdbc").options(..., dtable=tmp_tablename).save()
columns = ','.join(df.columns)
query = f"""
SET IDENTITY_INSERT {tablename} ON;
INSERT INTO {tablename} ({columns})
SELECT {columns} FROM {tmp_tablename};
SET IDENTITY_INSERT {tablename} OFF;
DROP TABLE {tmp_tablename};
"""
execute(query) # You can use Cursor from pyodbc for example to execute raw SQL queries
Upvotes: 0