Bas
Bas

Reputation: 607

PySpark get value of previous group

I'm trying to get the previous value within the same group, using a dataframe and PySpark, but i'm unable to get this to work when the group consists of two columns (date and text)

window = Window.partitionBy("date", "text").orderBy("date", "text")
df2 = df2.withColumn('prev_date', func.lag(df2['count']).over(window))

Resulting in:

+--------+----+-----+---+----------+
|    date|text|count|day|prev_count|
+--------+----+-----+---+----------+
|20180901| cat|    2|  1|      null|
|20180901| dog|    2|  1|      null|
|20180902| cat|    3|  2|      null|
|20180902| dog|    6|  2|      null|
|20180903| cat|    2|  3|      null|
|20180904| cat|    3|  4|      null|
|20180905| cat|    2|  5|      null|
|20180905| dog|    4|  5|      null|
+--------+----+-----+---+----------+

The desired output:

+--------+----+-----+---+----------+
|    date|text|count|day|prev_count|
+--------+----+-----+---+----------+
|20180901| cat|    2|  1|      null|
|20180901| dog|    2|  1|      null|
|20180902| cat|    3|  2|         2|
|20180902| dog|    6|  2|         2|
|20180903| cat|    2|  3|         3|
|20180904| cat|    3|  4|         2|
|20180905| cat|    2|  5|         3|
|20180905| dog|    4|  5|         6|
+--------+----+-----+---+----------+

The goal is to compare text count from one day to the previous day.

Thank you.

Upvotes: 0

Views: 585

Answers (1)

Ali Yesilli
Ali Yesilli

Reputation: 2200

I think you should remove "date" field in partitionBy statement. Data is unique with "date" and "text" fields so it means there is not another same combination again. This is the reason why all values return null

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as func
>>> 
>>> data = sc.parallelize([
...     ('20180901','cat',2,1),
...     ('20180901','dog',2,1),
...     ('20180902','cat',3,2),
...     ('20180902','dog',6,2),
...     ('20180903','cat',2,3),
...     ('20180904','cat',3,4),
...     ('20180905','cat',2,5),
...     ('20180905','dog',4,5)])
>>> 
>>> columns = ['date','text','count','day']
>>> df = spark.createDataFrame(data, columns)
>>> 
>>> window = Window.partitionBy('text').orderBy('date','text')
>>> df = df.withColumn('prev_date', func.lag('count').over(window))
>>> 
>>> df.sort('date','text').show()
+--------+----+-----+---+---------+                                             
|    date|text|count|day|prev_date|
+--------+----+-----+---+---------+
|20180901| cat|    2|  1|     null|
|20180901| dog|    2|  1|     null|
|20180902| cat|    3|  2|        2|
|20180902| dog|    6|  2|        2|
|20180903| cat|    2|  3|        3|
|20180904| cat|    3|  4|        2|
|20180905| cat|    2|  5|        3|
|20180905| dog|    4|  5|        6|
+--------+----+-----+---+---------+

Upvotes: 1

Related Questions