Reputation: 3043
I have a PySpark dataframe (say df1
) with the following columns
1.> category
- contains unique category types
2.> start_time_array
- array of timestamps in ascending order
3.> end_time_array
- array of timestamps in ascending order
4.> lenStart
- length of arrays in start_time_array
5.> lenEnd
- length of arrays in end_time_array
Following is an example of df1
:
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B| [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 1|
+--------+------------------------------------------+------------------------------------------+--------+------+
There is another dataframe df2
which contains two columns category
and timestamp
. df2
contains the same values of category
as df1
and the values of timestamps inside arrays in df1
are a subset of the timestamps in df2
. Following is an example of df2
+--------+-------------------+
|category| timestamp|
+--------+-------------------+
| A|2017-01-16 00:00:00|
| A|2017-01-18 00:00:00|
| A|2017-01-25 00:00:00|
| A|2017-01-27 00:00:00|
| B|2017-02-14 00:00:00|
| B|2017-02-18 00:00:00|
| B|2017-02-21 00:00:00|
| B|2017-02-22 00:00:00|
| B|2017-02-24 00:00:00|
| B|2017-02-25 00:00:00|
+--------+-------------------+
As we see in the above example in df1
, for category -> B
, lenStart=2
is not equal to lenEnd=1
. In all rows of df1
, either lenStart = lenEnd
, or , lenStart = lenEnd+1
For all rows in df1
where lenStart = lenEnd+1
, I want to take the largest value of timestamp
(of appropriate category
) and place it in the array in end_time_array
. How can I do this?
Following is the expected output after processing of df1
using information from df2
+--------+------------------------------------------+------------------------------------------+--------+------+
|category| end_time_array| start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
| A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]| 2| 2|
| B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]| 2| 2|
+--------+------------------------------------------+------------------------------------------+--------+------+
Upvotes: 2
Views: 202
Reputation: 786
This should work on Spark 1.5+:
import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))
df1_changed
will have a modified end_time_array
column with the wanted value added to it when the condition you requested applies, otherwise, it remains unchanged.
Upvotes: 2