Marc
Marc

Reputation: 123

PySpark The conversion of a datetime2 data type to a datetime data type resulted in an out-of-range value

Okay so I am trying to write a database from pyspark to an azure sql database, but am running into an issue with datetime out of bounds values. I am aware of the difference in range of values for datetime and datetime.

My table is defined as followed:

CREATE TABLE [dbo].[DimTour]
(
    [TourSk] BIGINT NOT NULL,
    [TourBk] INT NOT NULL,
    [TourType] VARCHAR(20) NOT NULL,
    [RequestedDateTimeUTC] DATETIME2 NOT NULL,
    [ScheduledDateTimeUTC] DATETIME2 NOT NULL,
    [TourStatus]  VARCHAR(20) NOT NULL,
    CONSTRAINT [PK_DimTour] PRIMARY KEY CLUSTERED([TourSk] ASC)
);

When I first upload this table to azure the date types are Datetime2 and the data read into my dataframe from the data source is in Datetime2 format. However, when I try to upload the dataframe to the sql database I get the following error: "The conversion of a datetime2 data type to a datetime data type resulted in an out-of-range value." And then when I go look at my sql database table the type has switched from datetime2 to datetime. I am aware that this error stems from the 0001 years, but why is the data ever attempting to convert to datetime and how can I fix this?

Here is how I am writing the data:

dimTour.write.mode('Overwrite').jdbc(url=jdbcUrl, table='dbo.DimTour', properties=connectionProperties)

Upvotes: 2

Views: 3332

Answers (1)

Matthias Langer
Matthias Langer

Reputation: 1024

The problem is that Spark maps timestamps to DATETIME by default. You can override this by registering a custom org.apache.spark.sql.jdbc.JdbcDialect, that maps to DATETIME2 instead, like so:

import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.jdbc.JdbcType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.TimestampType

/**
 * Copied from [[https://github.com/apache/spark/blob/v2.4.0/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala MsSqlServerDialect]]
 *
 * This implementation differs from [[org.apache.spark.sql.jdbc.MsSqlServerDialect]] only insofar,
 * as [[TimestampType]] is mapped to `DATETIME2` instead of `DATETIME`. This is preferable, because among other limitations,
 * `DATETIME` cannot represent dates before 1753-01-01.
 */
object CustomMsSqlServerDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")

  override def getCatalystType(
                                sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    if (typeName.contains("datetimeoffset")) {
      // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
      Option(StringType)
    } else {
      None
    }
  }

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case TimestampType => Some(JdbcType("DATETIME2", java.sql.Types.TIMESTAMP))
    case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
    case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
    case _ => None
  }

  override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}

To register the custom dialect, place

import org.apache.spark.sql.jdbc.JdbcDialects

// ...
JdbcDialects.registerDialect(CustomMsSqlServerDialect)
// ...

somewhere in your initialization code, ideally before creating the Spark session.

Upvotes: 4

Related Questions