NetRocks
NetRocks

Reputation: 477

Update Minute and Seconds value in Dataframe column using Pyspark

I have DF as follows:

Name    starttime               endtime
user1   2019-08-02 03:34:45   2019-08-02 03:52:03
user2   2019-08-13 13:34:10   2019-08-13 14:02:10

I would like to check if the endtime bleeds into the next hour and if it does then update it to the last minute and second of the current hour as shown below.

Name    starttime               endtime
user1   2019-08-02 03:34:45   2019-08-02 03:52:03
user2   2019-08-13 13:34:10   2019-08-13 13:59:59

I can do the check & replace it as below using UDF but would prefer not to use them.

def adjust_end_hour(date):
    return date.replace(second=59,minute=59)

adjust_end_hour_udf = udf(adjust_end_hour, TimestampType())

df = df.\
   filter(df.endtime > adjust_end_hour_udf(df.starttime)).\
   withColumn('enddtime', adjust_end_hour_udf(df.starttime))

How can I do that without using UDF in pyspark?

Upvotes: 0

Views: 1017

Answers (2)

blackbishop
blackbishop

Reputation: 32650

Another solution would be to truncate the starttime to hour then add 59 seconds and 59 minutes using SQL syntax INTERVAL like this:

adjust_expr = "date_trunc('hour', starttime) + INTERVAL 59 seconds + INTERVAL 59 minutes"

df.withColumn("endtime",
              when(col("endtime") > expr(adjust_expr),
                   expr(adjust_expr)
                  ).otherwise(col("endtime"))
              )\
  .show()

Gives:

+-----+-------------------+-------------------+
| name|          starttime|            endtime|
+-----+-------------------+-------------------+
|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|
|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
+-----+-------------------+-------------------+

Upvotes: 1

pault
pault

Reputation: 43494

Assuming your DataFrame has the following schema:

df.printSchema()
#root
# |-- Name: string (nullable = true)
# |-- starttime: timestamp (nullable = true)
# |-- endtime: timestamp (nullable = true)

i.e. where starttime and endtime are both TimestampType().

You can check if the endtime bleeds into the next hour by comparing the hour parts of starttime and endtime. If they are not equal1, that means you need to truncate the end time.

from pyspark.sql.functions import col, hour

df.withColumn(
    "bleeds_into_next_hour", 
    hour(col("endtime")) != hour(col("starttime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name|          starttime|            endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|                false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:02:10|                 true|
#+-----+-------------------+-------------------+---------------------+

This tells you which rows need to be modified. You can almost get to the desired ouput using date_trunc with the format parameter set to hour:

from pyspark.sql.functions import date_trunc, when

df.withColumn(
    "bleeds_into_next_hour", 
    hour(col("endtime")) != hour(col("starttime"))
).withColumn(
    "endtime", 
    when(
        col("bleeds_into_next_hour"), 
        date_trunc('hour', "endtime")
    ).otherwise(col("endtime"))
).show()
#+-----+-------------------+-------------------+---------------------+
#| Name|          starttime|            endtime|bleeds_into_next_hour|
#+-----+-------------------+-------------------+---------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:03|                false|
#|user2|2019-08-13 13:34:10|2019-08-13 14:00:00|                 true|
#+-----+-------------------+-------------------+---------------------+

All you have to now is subtract 1 second from endtime. The easiest way is to convert a unix_timestamp, subtract 1, and then convert back using from_unixtime.

from pyspark.sql.functions import from_unixtime, unix_timestamp

df.withColumn(
    "bleeds_into_next_hour", 
    hour(col("endtime")) != hour(col("starttime"))
).withColumn(
    "endtime", 
    from_unixtime(
        unix_timestamp(
            when(
                col("bleeds_into_next_hour"), 
                date_trunc('hour', "endtime")
            ).otherwise(col("endtime"))
        ) - 1
    )
).drop("bleeds_into_next_hour").show()
#+-----+-------------------+-------------------+
#| Name|          starttime|            endtime|
#+-----+-------------------+-------------------+
#|user1|2019-08-02 03:34:45|2019-08-02 03:52:02|
#|user2|2019-08-13 13:34:10|2019-08-13 13:59:59|
#+-----+-------------------+-------------------+

Putting it all together, without the intermediary column:

from pyspark.sql.functions import col, date_trunc, from_unixtime, hour, unix_timestamp, when

df = df.withColumn(
    "endtime", 
    from_unixtime(
        unix_timestamp(
            when(
                hour(col("endtime")) != hour(col("starttime")), 
                date_trunc('hour', "endtime")
            ).otherwise(col("endtime"))
        ) - 1
    )
)

Notes

  1. Assuming endtime is always greater than or equal to starttime. You can't do > because hours wrap after hour 12.

Upvotes: 0

Related Questions