Reputation: 477
I have DF as follows:
Name starttime endtime
user1 2019-08-02 03:34:45 2019-08-02 03:52:03
user2 2019-08-13 13:34:10 2019-08-13 14:02:10
I would like to check if the endtime
bleeds into the next hour and if it does then update it to the last minute and second of the current hour as shown below.
Name starttime endtime
user1 2019-08-02 03:34:45 2019-08-02 03:52:03
user2 2019-08-13 13:34:10 2019-08-13 13:59:59
I can do the check & replace it as below using UDF but would prefer not to use them.
def adjust_end_hour(date):
return date.replace(second=59,minute=59)
adjust_end_hour_udf = udf(adjust_end_hour, TimestampType())
df = df.\
filter(df.endtime > adjust_end_hour_udf(df.starttime)).\
withColumn('enddtime', adjust_end_hour_udf(df.starttime))
How can I do that without using UDF in pyspark?
Upvotes: 0
Views: 1017
Reputation: 32650
Another solution would be to truncate the starttime
to hour then add 59 seconds and 59 minutes using SQL syntax INTERVAL
like this:
adjust_expr = "date_trunc('hour', starttime) + INTERVAL 59 seconds + INTERVAL 59 minutes"
df.withColumn("endtime",
when(col("endtime") > expr(adjust_expr),
expr(adjust_expr)
).otherwise(col("endtime"))
)\
.show()
Gives:
+-----+-------------------+-------------------+
| name| starttime| endtime|
+-----+-------------------+-------------------+
|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
+-----+-------------------+-------------------+
Upvotes: 1
Reputation: 43494
Assuming your DataFrame has the following schema:
df.printSchema()
#root
# |-- Name: string (nullable = true)
# |-- starttime: timestamp (nullable = true)
# |-- endtime: timestamp (nullable = true)
i.e. where starttime
and endtime
are both TimestampType()
.
You can check if the endtime
bleeds into the next hour by comparing the hour
parts of starttime
and endtime
. If they are not equal1, that means you need to truncate the end time.
from pyspark.sql.functions import col, hour
df.withColumn(
"bleeds_into_next_hour",
hour(col("endtime")) != hour(col("starttime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name| starttime| endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03| false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:02:10| true|
#+-----+-------------------+-------------------+---------------------+
This tells you which rows need to be modified. You can almost get to the desired ouput using date_trunc
with the format
parameter set to hour
:
from pyspark.sql.functions import date_trunc, when
df.withColumn(
"bleeds_into_next_hour",
hour(col("endtime")) != hour(col("starttime"))
).withColumn(
"endtime",
when(
col("bleeds_into_next_hour"),
date_trunc('hour', "endtime")
).otherwise(col("endtime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name| starttime| endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03| false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:00:00| true|
#+-----+-------------------+-------------------+---------------------+
All you have to now is subtract 1 second from endtime
. The easiest way is to convert a unix_timestamp
, subtract 1, and then convert back using from_unixtime
.
from pyspark.sql.functions import from_unixtime, unix_timestamp
df.withColumn(
"bleeds_into_next_hour",
hour(col("endtime")) != hour(col("starttime"))
).withColumn(
"endtime",
from_unixtime(
unix_timestamp(
when(
col("bleeds_into_next_hour"),
date_trunc('hour', "endtime")
).otherwise(col("endtime"))
) - 1
)
).drop("bleeds_into_next_hour").show()
#+-----+-------------------+-------------------+
#| Name| starttime| endtime|
#+-----+-------------------+-------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:02|
#|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
#+-----+-------------------+-------------------+
Putting it all together, without the intermediary column:
from pyspark.sql.functions import col, date_trunc, from_unixtime, hour, unix_timestamp, when
df = df.withColumn(
"endtime",
from_unixtime(
unix_timestamp(
when(
hour(col("endtime")) != hour(col("starttime")),
date_trunc('hour', "endtime")
).otherwise(col("endtime"))
) - 1
)
)
Notes
endtime
is always greater than or equal to starttime
. You can't do >
because hours wrap after hour 12.Upvotes: 0