Reputation: 135
I'm working on a price elasticity problem where i need to calculate the elasticity for each unique id
My dataframe looks like
| id | price | items |
| 101 | 5 | 10 |
| 101 | 10 | 15 |
| 101 | 12 | 20 |
| 102 | 1 | 1 |
| 102 | 3 | 7 |
To find the elasticity: consider the example of 101, there are 3 changes in prices happening, these three price changes and the corresponding items change should be the new dataframe.
1) price change of 5 (5 -> 10 (or 10 -> 5)) resulted in 5 item change (10 -> 15 (or 15 -> 10)) so corresponding row would be pricechange =5, itemschange=5
2) price change of 7 (5 -> 12 (or 12 -> 5)) resulted in 10 item change (10 -> 20 (or 20 -> 10)) so corresponding row would be pricechange =7, itemschange=10
3) price change of 2 (10 -> 12 (or 12 -> 10) ) resulted in 5 item change (15 -> 20 (or 20 -> 15)) so corresponding row would be pricechange =2, itemschange=5
The dataframe would be transformed to:
| id | pricechange | itemschange |
| 101 | 5 | 5 |
| 101 | 7 | 10 |
| 101 | 2 | 5 |
| 102 | 2 | 6 |
Upvotes: 1
Views: 187
Reputation: 43504
You can simply do an inner join of the DataFrame with itself on the id
column. To avoid duplicate records, define a where
clause that requires the left DataFrame's price to be greater than that of the right DataFrame.
After the join, select the desired columns:
from pyspark.sql.functions import col
df.alias("r").join(df.alias("l"), on="id")\
.where("l.price > r.price")\
.select(
"id",
(col("l.price") - col("r.price")).alias("pricechange"),
(col("l.item") - col("r.item")).alias("itemschange"),
).show()
#+---+-----------+-----------+
#| id|pricechange|itemschange|
#+---+-----------+-----------+
#|101| 2| 5|
#|101| 7| 10|
#|101| 5| 5|
#|102| 2| 6|
#+---+-----------+-----------+
This will be more efficient than using a Window
.
Upvotes: 1
Reputation: 4089
Here is the detailed approach you can follow -
Define Schema and prepare data for DF
df = spark.createDataFrame([
(101,5,10),
(101,10,15),
(101,12,20),
(102,1,1),
(102,3,7)
],'id : int, price : int, item: int')
Create dummy column rank to compare each id with all other records of same id
from pyspark.sql.window import Window
from pyspark.sql.functions import *
windowSpec = Window.partitionBy('id').orderBy('id')
rank = row_number().over(windowSpec).alias('rank')
df = df.withColumn("rank", rank)
Final logic - Join and Filter
df.alias('a').\
join(df.alias('b'),on='id').\
where('a.rank < b.rank').\
selectExpr("a.id as id","b.price - a.price as price","b.item - a.item as item").\
show()
IMHO - It's always better to post what you tried so far and what error/issue your are facing along with the question. This help to get quick and better response.
Upvotes: 1