Metadata
Metadata

Reputation: 2083

How to specify timestamp values for bounds in pyspark dataframe?

I am trying to read a table from sqlserver and apply partitioning while reading. Before reading the data, I wanted to get bounds for lowerBound & upperBound as below.

boundsDF = spark.read.format('jdbc')
                .option('url', 'url')
                .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')
                .option('user', username)
                .option('password', password)
                .option('dbtable', f'(select min(updated_datetime) as mint, max(updated_datetime) as maxt from tablename)
                .load()

I extracted the values like below from boundsDF:

maxdate = [x["maxt"] for x in boundsDF.rdd.collect()]
mindate = [x["mint"] for x in boundsDF.rdd.collect()]

And this is how I specified the timestamp columns while reading:

dataframe = spark.read.format('jdbc')
                 .option('url', url)
                 .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver')
                 .option('user', user)
                 .option('password', password)
                 .option('dbtable', tablename)
                 .option('partitionColumn', timestamp_column)
                 .option('numPartitions', 3)
                 .option('lowerBound', mindate[0])
                 .option('upperBound', maxdate[0])
                 .option('fetchsize', 5000)
                 .load()

If I print the values of mindate & maxdate below is how they look like:

mindate[0]: datetime.datetime(2010, 10, 4, 11, 54, 13, 543000)
maxdate[0]: datetime.datetime(2021, 3, 5, 17, 59, 45, 880000)

When I print dataframe.count(), I see an exception message as below. Exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 18.0 failed 1 times, most recent failure: Lost task 2.0 in stage 18.0 (TID 21, executor driver): com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.

Since I started using Spark, I have always used integer columns for my partition column. This is the first time I am using a timestamp column for partitioning the data.

Are mindate[0] & maxdate[0] in the right format to be specified in my read statement ? Could anyone let me know if I am implementing the code in right manner ?

Upvotes: 1

Views: 1721

Answers (2)

downloaderfan
downloaderfan

Reputation: 83

I ran across the same issue, but after some experimentation, I was able to find the solution.

It seems there exists a bug which was raised about 2 years ago but sadly remains open to this date: https://issues.apache.org/jira/browse/SPARK-34097

However, instead of timestamp, if you pass just the date i.e. YYYY-MM-DD in lowerBound & upperBound, the code works.

Upvotes: 3

SQLpro
SQLpro

Reputation: 5131

The question is what datatype do you use in the SQL table ?

  1. TIMESTAMP is not a datetime datatype. It is an internal row version number (binary) and has nothing to do with temporal data
  2. DATETIME is the old deprecated datatype for DATE + TIME and has a limitation of 3 decimal places for the second
  3. DATETIME2, which replace datetime, is the new datatype to use for DATE + TIME and has a limitation that you can chosse between 0 and 7 decimal places for the second

Now two remarks:

  1. if you use TIMESTAMP, replace it by DATETIME2 whith the required précision (by default 7).
  2. if you use DATETIME and you do not want to replace it by DATETIME2, you must only specify 3 digits for the decimal part of the second, but what I see in your code is mindate[0]: datetime.datetime(2010, 10, 4, 11, 54, 13, 543000)

DATETIME2 is more accurate than DATETIME that is limited to 3 ms causing some queries to be badly interpreted

Upvotes: 1

Related Questions