Siddharth Satpathy
Siddharth Satpathy

Reputation: 3043

Put largest value timestamp in array in PySpark

I have a PySpark dataframe (say df1) with the following columns

1.> category - contains unique category types

2.> start_time_array - array of timestamps in ascending order

3.> end_time_array - array of timestamps in ascending order

4.> lenStart - length of arrays in start_time_array

5.> lenEnd - length of arrays in end_time_array

Following is an example of df1:

+--------+------------------------------------------+------------------------------------------+--------+------+
|category|                            end_time_array|                          start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
|       A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]|       2|     2|
|       B|                     [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]|       2|     1|
+--------+------------------------------------------+------------------------------------------+--------+------+

There is another dataframe df2 which contains two columns category and timestamp. df2 contains the same values of category as df1 and the values of timestamps inside arrays in df1 are a subset of the timestamps in df2. Following is an example of df2

+--------+-------------------+
|category|          timestamp|
+--------+-------------------+
|       A|2017-01-16 00:00:00|
|       A|2017-01-18 00:00:00|
|       A|2017-01-25 00:00:00|
|       A|2017-01-27 00:00:00|
|       B|2017-02-14 00:00:00|
|       B|2017-02-18 00:00:00|
|       B|2017-02-21 00:00:00|
|       B|2017-02-22 00:00:00|
|       B|2017-02-24 00:00:00|
|       B|2017-02-25 00:00:00|
+--------+-------------------+

As we see in the above example in df1, for category -> B , lenStart=2 is not equal to lenEnd=1. In all rows of df1 , either lenStart = lenEnd , or , lenStart = lenEnd+1 For all rows in df1 where lenStart = lenEnd+1, I want to take the largest value of timestamp (of appropriate category) and place it in the array in end_time_array. How can I do this?

Following is the expected output after processing of df1 using information from df2

+--------+------------------------------------------+------------------------------------------+--------+------+
|category|                            end_time_array|                          start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
|       A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]|       2|     2|
|       B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]|       2|     2|
+--------+------------------------------------------+------------------------------------------+--------+------+

Upvotes: 2

Views: 202

Answers (1)

David Taub
David Taub

Reputation: 786

This should work on Spark 1.5+:

import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
    F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
                                                F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))

df1_changed will have a modified end_time_array column with the wanted value added to it when the condition you requested applies, otherwise, it remains unchanged.

Upvotes: 2

Related Questions