Reputation: 123
Okay so I am trying to write a database from pyspark to an azure sql database, but am running into an issue with datetime out of bounds values. I am aware of the difference in range of values for datetime and datetime.
My table is defined as followed:
CREATE TABLE [dbo].[DimTour]
(
[TourSk] BIGINT NOT NULL,
[TourBk] INT NOT NULL,
[TourType] VARCHAR(20) NOT NULL,
[RequestedDateTimeUTC] DATETIME2 NOT NULL,
[ScheduledDateTimeUTC] DATETIME2 NOT NULL,
[TourStatus] VARCHAR(20) NOT NULL,
CONSTRAINT [PK_DimTour] PRIMARY KEY CLUSTERED([TourSk] ASC)
);
When I first upload this table to azure the date types are Datetime2 and the data read into my dataframe from the data source is in Datetime2 format. However, when I try to upload the dataframe to the sql database I get the following error: "The conversion of a datetime2 data type to a datetime data type resulted in an out-of-range value." And then when I go look at my sql database table the type has switched from datetime2 to datetime. I am aware that this error stems from the 0001 years, but why is the data ever attempting to convert to datetime and how can I fix this?
Here is how I am writing the data:
dimTour.write.mode('Overwrite').jdbc(url=jdbcUrl, table='dbo.DimTour', properties=connectionProperties)
Upvotes: 2
Views: 3332
Reputation: 1024
The problem is that Spark maps timestamps to DATETIME
by default. You can override this by registering a custom org.apache.spark.sql.jdbc.JdbcDialect
, that maps to DATETIME2
instead, like so:
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.jdbc.JdbcType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.TimestampType
/**
* Copied from [[https://github.com/apache/spark/blob/v2.4.0/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala MsSqlServerDialect]]
*
* This implementation differs from [[org.apache.spark.sql.jdbc.MsSqlServerDialect]] only insofar,
* as [[TimestampType]] is mapped to `DATETIME2` instead of `DATETIME`. This is preferable, because among other limitations,
* `DATETIME` cannot represent dates before 1753-01-01.
*/
object CustomMsSqlServerDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (typeName.contains("datetimeoffset")) {
// String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
Option(StringType)
} else {
None
}
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case TimestampType => Some(JdbcType("DATETIME2", java.sql.Types.TIMESTAMP))
case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
case _ => None
}
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
To register the custom dialect, place
import org.apache.spark.sql.jdbc.JdbcDialects
// ...
JdbcDialects.registerDialect(CustomMsSqlServerDialect)
// ...
somewhere in your initialization code, ideally before creating the Spark session.
Upvotes: 4