Reputation: 515
I am trying to get the difference between two timestamp columns but the milliseconds is gone.
How to correct this?
from pyspark.sql.functions import unix_timestamp
timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"
data = [
(1, '2018-07-25 17:15:06.39','2018-07-25 17:15:06.377'),
(2,'2018-07-25 11:12:49.317','2018-07-25 11:12:48.883')
]
df = spark.createDataFrame(data, ['ID', 'max_ts','min_ts']).withColumn('diff',F.unix_timestamp('max_ts', format=timeFmt) - F.unix_timestamp('min_ts', format=timeFmt))
df.show(truncate = False)
Upvotes: 14
Views: 36125
Reputation: 369
When you cannot guarantee the exact format of the sub-seconds (length? trailing zeros?), I propose the following little algorithm, which should work for all lengths and formats:
timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"
current_col = "time"
df = df.withColumn("subsecond_string", F.substring_index(current_col, '.', -1))
df = df.withColumn("subsecond_length", F.length(F.col("subsecond_string")))
df = df.withColumn("divisor", F.pow(10,"subsecond_length"))
df = df.withColumn("subseconds", F.col("subsecond_string").cast("int") / F.col("divisor") )
# Putting it all together
df = df.withColumn("timestamp_subsec", F.unix_timestamp(current_col, format=timeFmt) + F.col("subseconds"))
Based on the length of the subsecond-string, an appropriate divisor is calculated (10 to the power of the length of the substring).
Dropping the superfluous columns afterwards should not be a problem.
My exemplary result looks like this:
+----------------------+----------------+----------------+-------+----------+----------------+
|time |subsecond_string|subsecond_length|divisor|subseconds|timestamp_subsec|
+----------------------+----------------+----------------+-------+----------+----------------+
|2019-04-02 14:34:16.02|02 |2 |100.0 |0.02 |1.55420845602E9 |
|2019-04-02 14:34:16.03|03 |2 |100.0 |0.03 |1.55420845603E9 |
|2019-04-02 14:34:16.04|04 |2 |100.0 |0.04 |1.55420845604E9 |
|2019-04-02 14:34:16.05|05 |2 |100.0 |0.05 |1.55420845605E9 |
|2019-04-02 14:34:16.06|06 |2 |100.0 |0.06 |1.55420845606E9 |
|2019-04-02 14:34:16.07|07 |2 |100.0 |0.07 |1.55420845607E9 |
|2019-04-02 14:34:16.08|08 |2 |100.0 |0.08 |1.55420845608E9 |
|2019-04-02 14:34:16.09|09 |2 |100.0 |0.09 |1.55420845609E9 |
|2019-04-02 14:34:16.1 |1 |1 |10.0 |0.1 |1.5542084561E9 |
|2019-04-02 14:34:16.11|11 |2 |100.0 |0.11 |1.55420845611E9 |
|2019-04-02 14:34:16.12|12 |2 |100.0 |0.12 |1.55420845612E9 |
|2019-04-02 14:34:16.13|13 |2 |100.0 |0.13 |1.55420845613E9 |
|2019-04-02 14:34:16.14|14 |2 |100.0 |0.14 |1.55420845614E9 |
|2019-04-02 14:34:16.15|15 |2 |100.0 |0.15 |1.55420845615E9 |
|2019-04-02 14:34:16.16|16 |2 |100.0 |0.16 |1.55420845616E9 |
|2019-04-02 14:34:16.17|17 |2 |100.0 |0.17 |1.55420845617E9 |
|2019-04-02 14:34:16.18|18 |2 |100.0 |0.18 |1.55420845618E9 |
|2019-04-02 14:34:16.19|19 |2 |100.0 |0.19 |1.55420845619E9 |
|2019-04-02 14:34:16.2 |2 |1 |10.0 |0.2 |1.5542084562E9 |
|2019-04-02 14:34:16.21|21 |2 |100.0 |0.21 |1.55420845621E9 |
+----------------------+----------------+----------------+-------+----------+----------------+
Upvotes: 1
Reputation: 548
Assuming you already have a dataframe with columns of timestamp type:
from datetime import datetime
data = [
(1, datetime(2018, 7, 25, 17, 15, 6, 390000), datetime(2018, 7, 25, 17, 15, 6, 377000)),
(2, datetime(2018, 7, 25, 11, 12, 49, 317000), datetime(2018, 7, 25, 11, 12, 48, 883000))
]
df = spark.createDataFrame(data, ['ID', 'max_ts','min_ts'])
df.printSchema()
# root
# |-- ID: long (nullable = true)
# |-- max_ts: timestamp (nullable = true)
# |-- min_ts: timestamp (nullable = true)
You can get the time in seconds by casting the timestamp-type column to a double
type, or in milliseconds by multiplying that result by 1000 (and optionally casting to long
if you want an integer).
For example
df.select(
F.col('max_ts').cast('double').alias('time_in_seconds'),
(F.col('max_ts').cast('double') * 1000).cast('long').alias('time_in_milliseconds'),
).toPandas()
# time_in_seconds time_in_milliseconds
# 0 1532538906.390 1532538906390
# 1 1532517169.317 1532517169317
Finally, if you want the difference between the two times in milliseconds, you could do:
df.select(
((F.col('max_ts').cast('double') - F.col('min_ts').cast('double')) * 1000).cast('long').alias('diff_in_milliseconds'),
).toPandas()
# diff_in_milliseconds
# 0 13
# 1 434
I'm doing this on PySpark 2.4.2. There is no need to use string concatenation whatsoever.
Upvotes: 10
Reputation: 51
Reason pyspark to_timestamp parses only till seconds, while TimestampType have the ability to hold milliseconds.
Following workaround may work:
If the timestamp pattern contains S, Invoke a UDF to get the string 'INTERVAL MILLISECONDS' to use in expression
ts_pattern = "YYYY-MM-dd HH:mm:ss:SSS"
my_col_name = "time_with_ms"
# get the time till seconds
df = df.withColumn(my_col_name, to_timestamp(df["updated_date_col2"],ts_pattern))
if S in timestamp_pattern:
df = df.withColumn(my_col_name, df[my_col_name] + expr("INTERVAL 256 MILLISECONDS"))
To get INTERVAL 256 MILLISECONDS we may use a Java UDF:
df = df.withColumn(col_name, df[col_name] + expr(getIntervalStringUDF(df[my_col_name], ts_pattern)))
Inside UDF: getIntervalStringUDF(String timeString, String pattern)
Please refer pyspark to_timestamp does not include milliseconds
Upvotes: 3
Reputation: 11
Unlike @kaichi I did not find that trailing zeros were truncated by the substring_index
command, so multiplying milliseconds by ten is not necessary and can give you the wrong answer, for example if the milliseconds is originally 099 this would become 990. Furthermore, you may want to also add handling of timestamps that have zero milliseconds. To handle both of these situations, I have modified @kaichi's answer to give the following as the difference between two timestamps in milliseconds:
df = (
df
.withColumn('tmpLongColumn', f.substring_index(tmpColumn, '.', -1).cast('long'))
.withColumn(
'tmpLongColumn',
f.when(f.col('tmpLongColumn').isNull(), 0.0)
.otherwise(f.col('tmpLongColumn')))
.withColumn(
tmpColumn,
(f.unix_timestamp(tmpColumn, format=timeFmt)*1000 + f.col('tmpLongColumn')))
.drop('tmpLongColumn'))
Upvotes: 1
Reputation: 101
The answer from Tanjin doesn't work when the values are of type timestamp
and the milliseconds are round numbers (like 390, 500). Python would cut the 0
at the end and the timestamp from the example would look like this 2018-07-25 17:15:06.39
.
The problem is the hardcoded value in F.substring('max_ts', -3, 3)
. If the 0
at the end is missing then the substring
goes wild.
To convert tmpColumn
of type timestamp
column to tmpLongColumn
of type long
I used this snippet:
timeFmt = "yyyy-MM-dd HH:mm:ss.SSS"
df = df \
.withColumn('tmpLongColumn', F.substring_index('tmpColumn', '.', -1).cast('float')) \
.withColumn('tmpLongColumn', F.when(F.col('tmpLongColumn') < 100, F.col('tmpLongColumn')*10).otherwise(F.col('tmpLongColumn')).cast('long')) \
.withColumn('tmpLongColumn', (F.unix_timestamp('tmpColumn', format=timeFmt)*1000 + F.col('tmpLongColumn'))) \
The first transformation extracts the substring containing the milliseconds. Next, if the value is less then 100 multiply it by 10. Finally, convert the timestamp and add milliseconds.
Upvotes: 2
Reputation: 2452
That's the intended behavior for unix_timestamp
- it clearly states in the source code docstring it only returns seconds, so the milliseconds component is dropped when doing the calculation.
If you want to have that calculation, you can use the substring
function to concat the numbers and then do the difference. See the example below. Please note that this assumes fully formed data, for example the milliseconds are fulfilled entirely (all 3 digits):
import pyspark.sql.functions as F
timeFmt = "yyyy-MM-dd' 'HH:mm:ss.SSS"
data = [
(1, '2018-07-25 17:15:06.390', '2018-07-25 17:15:06.377'), # note the '390'
(2, '2018-07-25 11:12:49.317', '2018-07-25 11:12:48.883')
]
df = spark.createDataFrame(data, ['ID', 'max_ts', 'min_ts'])\
.withColumn('max_milli', F.unix_timestamp('max_ts', format=timeFmt) + F.substring('max_ts', -3, 3).cast('float')/1000)\
.withColumn('min_milli', F.unix_timestamp('min_ts', format=timeFmt) + F.substring('min_ts', -3, 3).cast('float')/1000)\
.withColumn('diff', (F.col('max_milli') - F.col('min_milli')).cast('float') * 1000)
df.show(truncate=False)
+---+-----------------------+-----------------------+----------------+----------------+---------+
|ID |max_ts |min_ts |max_milli |min_milli |diff |
+---+-----------------------+-----------------------+----------------+----------------+---------+
|1 |2018-07-25 17:15:06.390|2018-07-25 17:15:06.377|1.53255330639E9 |1.532553306377E9|13.000011|
|2 |2018-07-25 11:12:49.317|2018-07-25 11:12:48.883|1.532531569317E9|1.532531568883E9|434.0 |
+---+-----------------------+-----------------------+----------------+----------------+---------+
Upvotes: 6