Keerikkattu Chellappan
Keerikkattu Chellappan

Reputation: 515

PySpark Milliseconds of TimeStamp

I am trying to get the difference between two timestamp columns but the milliseconds is gone.

How to correct this?

from pyspark.sql.functions import unix_timestamp
timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"

data = [
    (1, '2018-07-25 17:15:06.39','2018-07-25 17:15:06.377'),
    (2,'2018-07-25 11:12:49.317','2018-07-25 11:12:48.883')

]

df = spark.createDataFrame(data, ['ID', 'max_ts','min_ts']).withColumn('diff',F.unix_timestamp('max_ts', format=timeFmt) - F.unix_timestamp('min_ts', format=timeFmt))
df.show(truncate = False)

Upvotes: 14

Views: 36125

Answers (6)

bolla
bolla

Reputation: 369

When you cannot guarantee the exact format of the sub-seconds (length? trailing zeros?), I propose the following little algorithm, which should work for all lengths and formats:

Algorithm

timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"

current_col = "time"
df = df.withColumn("subsecond_string", F.substring_index(current_col, '.', -1))
df = df.withColumn("subsecond_length",   F.length(F.col("subsecond_string")))
df = df.withColumn("divisor",   F.pow(10,"subsecond_length"))
df = df.withColumn("subseconds", F.col("subsecond_string").cast("int") / F.col("divisor")  )
# Putting it all together
df = df.withColumn("timestamp_subsec", F.unix_timestamp(current_col, format=timeFmt) + F.col("subseconds"))

Based on the length of the subsecond-string, an appropriate divisor is calculated (10 to the power of the length of the substring).

Dropping the superfluous columns afterwards should not be a problem.

Demonstration

My exemplary result looks like this:

+----------------------+----------------+----------------+-------+----------+----------------+
|time                  |subsecond_string|subsecond_length|divisor|subseconds|timestamp_subsec|
+----------------------+----------------+----------------+-------+----------+----------------+
|2019-04-02 14:34:16.02|02              |2               |100.0  |0.02      |1.55420845602E9 |
|2019-04-02 14:34:16.03|03              |2               |100.0  |0.03      |1.55420845603E9 |
|2019-04-02 14:34:16.04|04              |2               |100.0  |0.04      |1.55420845604E9 |
|2019-04-02 14:34:16.05|05              |2               |100.0  |0.05      |1.55420845605E9 |
|2019-04-02 14:34:16.06|06              |2               |100.0  |0.06      |1.55420845606E9 |
|2019-04-02 14:34:16.07|07              |2               |100.0  |0.07      |1.55420845607E9 |
|2019-04-02 14:34:16.08|08              |2               |100.0  |0.08      |1.55420845608E9 |
|2019-04-02 14:34:16.09|09              |2               |100.0  |0.09      |1.55420845609E9 |
|2019-04-02 14:34:16.1 |1               |1               |10.0   |0.1       |1.5542084561E9  |
|2019-04-02 14:34:16.11|11              |2               |100.0  |0.11      |1.55420845611E9 |
|2019-04-02 14:34:16.12|12              |2               |100.0  |0.12      |1.55420845612E9 |
|2019-04-02 14:34:16.13|13              |2               |100.0  |0.13      |1.55420845613E9 |
|2019-04-02 14:34:16.14|14              |2               |100.0  |0.14      |1.55420845614E9 |
|2019-04-02 14:34:16.15|15              |2               |100.0  |0.15      |1.55420845615E9 |
|2019-04-02 14:34:16.16|16              |2               |100.0  |0.16      |1.55420845616E9 |
|2019-04-02 14:34:16.17|17              |2               |100.0  |0.17      |1.55420845617E9 |
|2019-04-02 14:34:16.18|18              |2               |100.0  |0.18      |1.55420845618E9 |
|2019-04-02 14:34:16.19|19              |2               |100.0  |0.19      |1.55420845619E9 |
|2019-04-02 14:34:16.2 |2               |1               |10.0   |0.2       |1.5542084562E9  |
|2019-04-02 14:34:16.21|21              |2               |100.0  |0.21      |1.55420845621E9 |
+----------------------+----------------+----------------+-------+----------+----------------+

Upvotes: 1

Troy
Troy

Reputation: 548

Assuming you already have a dataframe with columns of timestamp type:

from datetime import datetime

data = [
    (1, datetime(2018, 7, 25, 17, 15, 6, 390000), datetime(2018, 7, 25, 17, 15, 6, 377000)),
    (2, datetime(2018, 7, 25, 11, 12, 49, 317000), datetime(2018, 7, 25, 11, 12, 48, 883000))
]

df = spark.createDataFrame(data, ['ID', 'max_ts','min_ts'])
df.printSchema()

# root
#  |-- ID: long (nullable = true)
#  |-- max_ts: timestamp (nullable = true)
#  |-- min_ts: timestamp (nullable = true)

You can get the time in seconds by casting the timestamp-type column to a double type, or in milliseconds by multiplying that result by 1000 (and optionally casting to long if you want an integer). For example

df.select(
    F.col('max_ts').cast('double').alias('time_in_seconds'),
    (F.col('max_ts').cast('double') * 1000).cast('long').alias('time_in_milliseconds'),
).toPandas()

#     time_in_seconds  time_in_milliseconds
# 0    1532538906.390         1532538906390
# 1    1532517169.317         1532517169317

Finally, if you want the difference between the two times in milliseconds, you could do:

df.select(
    ((F.col('max_ts').cast('double') - F.col('min_ts').cast('double')) * 1000).cast('long').alias('diff_in_milliseconds'),
).toPandas()

#    diff_in_milliseconds
# 0                    13
# 1                   434

I'm doing this on PySpark 2.4.2. There is no need to use string concatenation whatsoever.

Upvotes: 10

Adeeb
Adeeb

Reputation: 51

Reason pyspark to_timestamp parses only till seconds, while TimestampType have the ability to hold milliseconds.

Following workaround may work:

If the timestamp pattern contains S, Invoke a UDF to get the string 'INTERVAL MILLISECONDS' to use in expression

ts_pattern = "YYYY-MM-dd HH:mm:ss:SSS"
my_col_name = "time_with_ms"

# get the time till seconds
df = df.withColumn(my_col_name, to_timestamp(df["updated_date_col2"],ts_pattern))

if S in timestamp_pattern:
   df = df.withColumn(my_col_name, df[my_col_name] + expr("INTERVAL 256 MILLISECONDS"))

To get INTERVAL 256 MILLISECONDS we may use a Java UDF:

df = df.withColumn(col_name, df[col_name] + expr(getIntervalStringUDF(df[my_col_name], ts_pattern)))

Inside UDF: getIntervalStringUDF(String timeString, String pattern)

  1. Use SimpleDateFormat to parse date according to pattern
  2. return formatted date as string using pattern "'INTERVAL 'SSS' MILLISECONDS'"
  3. return 'INTERVAL 0 MILLISECONDS' on parse/format exceptions

Please refer pyspark to_timestamp does not include milliseconds

Upvotes: 3

cm06kb
cm06kb

Reputation: 11

Unlike @kaichi I did not find that trailing zeros were truncated by the substring_index command, so multiplying milliseconds by ten is not necessary and can give you the wrong answer, for example if the milliseconds is originally 099 this would become 990. Furthermore, you may want to also add handling of timestamps that have zero milliseconds. To handle both of these situations, I have modified @kaichi's answer to give the following as the difference between two timestamps in milliseconds:

df = (
    df
    .withColumn('tmpLongColumn', f.substring_index(tmpColumn, '.', -1).cast('long'))
    .withColumn(
        'tmpLongColumn',
        f.when(f.col('tmpLongColumn').isNull(), 0.0)
        .otherwise(f.col('tmpLongColumn')))
    .withColumn(
        tmpColumn, 
        (f.unix_timestamp(tmpColumn, format=timeFmt)*1000 + f.col('tmpLongColumn')))
      .drop('tmpLongColumn'))

Upvotes: 1

kaichi
kaichi

Reputation: 101

The answer from Tanjin doesn't work when the values are of type timestamp and the milliseconds are round numbers (like 390, 500). Python would cut the 0 at the end and the timestamp from the example would look like this 2018-07-25 17:15:06.39.

The problem is the hardcoded value in F.substring('max_ts', -3, 3). If the 0 at the end is missing then the substring goes wild.

To convert tmpColumn of type timestamp column to tmpLongColumn of type long I used this snippet:

timeFmt = "yyyy-MM-dd HH:mm:ss.SSS"

df = df \
  .withColumn('tmpLongColumn', F.substring_index('tmpColumn', '.', -1).cast('float')) \
  .withColumn('tmpLongColumn', F.when(F.col('tmpLongColumn') < 100, F.col('tmpLongColumn')*10).otherwise(F.col('tmpLongColumn')).cast('long')) \
  .withColumn('tmpLongColumn', (F.unix_timestamp('tmpColumn', format=timeFmt)*1000 + F.col('tmpLongColumn'))) \

The first transformation extracts the substring containing the milliseconds. Next, if the value is less then 100 multiply it by 10. Finally, convert the timestamp and add milliseconds.

Upvotes: 2

Tanjin
Tanjin

Reputation: 2452

That's the intended behavior for unix_timestamp - it clearly states in the source code docstring it only returns seconds, so the milliseconds component is dropped when doing the calculation.

If you want to have that calculation, you can use the substring function to concat the numbers and then do the difference. See the example below. Please note that this assumes fully formed data, for example the milliseconds are fulfilled entirely (all 3 digits):

import pyspark.sql.functions as F

timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"
data = [
    (1, '2018-07-25 17:15:06.390', '2018-07-25 17:15:06.377'),  # note the '390'
    (2, '2018-07-25 11:12:49.317', '2018-07-25 11:12:48.883')
]

df = spark.createDataFrame(data, ['ID', 'max_ts', 'min_ts'])\
    .withColumn('max_milli', F.unix_timestamp('max_ts', format=timeFmt) + F.substring('max_ts', -3, 3).cast('float')/1000)\
    .withColumn('min_milli', F.unix_timestamp('min_ts', format=timeFmt) + F.substring('min_ts', -3, 3).cast('float')/1000)\
    .withColumn('diff', (F.col('max_milli') - F.col('min_milli')).cast('float') * 1000)

df.show(truncate=False)

+---+-----------------------+-----------------------+----------------+----------------+---------+
|ID |max_ts                 |min_ts                 |max_milli       |min_milli       |diff     |
+---+-----------------------+-----------------------+----------------+----------------+---------+
|1  |2018-07-25 17:15:06.390|2018-07-25 17:15:06.377|1.53255330639E9 |1.532553306377E9|13.000011|
|2  |2018-07-25 11:12:49.317|2018-07-25 11:12:48.883|1.532531569317E9|1.532531568883E9|434.0    |
+---+-----------------------+-----------------------+----------------+----------------+---------+

Upvotes: 6

Related Questions