Cralle
Cralle

Reputation: 118

Spark write Dataframe to SQL Server Table with Insert Identity On

I have a Spark Dataframe that I want to push to an SQL table on a remote server. The table has an Id column that is set as an identity column. The Dataframe I want to push also has as Id column, and I want to use those Ids in the SQL table, without removing the identity option for the column.

I write the dataframe like this:

df.write.format("jdbc") \
      .mode(mode) \
      .option("url", jdbc_url) \
      .option("dbtable", table_name) \
      .option("user", jdbc_username) \
      .option("password", jdbc_password) \
      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
      .save()

But I get the following response:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0 (TID 41, 10.1.0.4, executor 0): java.sql.BatchUpdateException: Cannot insert explicit value for identity column in table &#39Table' when IDENTITY_INSERT is set to OFF.

I have tried to add a query to the writing like:

query = f"SET IDENTITY_INSERT Table ON;"
df.write.format("jdbc") \
      .mode(mode) \
      .option("url", jdbc_url) \
      .option("query", query) \
      .option("dbtable", table_name) \
      .option("user", jdbc_username) \
      .option("password", jdbc_password) \
      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
      .save()

But that just throws an SQL syntax error:

IllegalArgumentException: Both 'dbtable' and 'query' can not be specified at the same time.

Or if I try to run a read with the query first:

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near the keyword 'SET'.

This must be because it only supports SELECT statements.

Is it possible to do in Spark, or would I need to use a different connector and combine the setting of the insert identity on, together with regular insert into statements?

I would prefer a solution that allowed me to keep writing through the Spark context. But I am open to other solutions.

Upvotes: 4

Views: 4120

Answers (1)

MrRedbloX
MrRedbloX

Reputation: 11

One way to work around this issue is the following:

  • Save your dataframe as a temporary table in your database.
  • Set identity insert to ON.
  • Insert into your real table the content of your temporary table.
  • Set identity insert to OFF.
  • Drop your temporary table.

Here's a pseudo code example:

tablename = "MyTable"
tmp_tablename = tablename+"tmp"
df.write.format("jdbc").options(..., dtable=tmp_tablename).save()
columns = ','.join(df.columns)
query = f"""
  SET IDENTITY_INSERT {tablename} ON;
  INSERT INTO {tablename} ({columns})
  SELECT {columns} FROM {tmp_tablename};
  SET IDENTITY_INSERT {tablename} OFF;
  DROP TABLE {tmp_tablename};
"""
execute(query) # You can use Cursor from pyodbc for example to execute raw SQL queries

Upvotes: 0

Related Questions