ComputerVisionEngineer
ComputerVisionEngineer

Reputation: 135

Pyspark : User defined elasticity for each unique id

I'm working on a price elasticity problem where i need to calculate the elasticity for each unique id

My dataframe looks like

| id    | price  | items | 

| 101   |  5     | 10    |   
| 101   |  10    | 15    | 
| 101   |  12    | 20    | 
| 102   |  1     | 1     | 
| 102   |  3     | 7     | 

To find the elasticity: consider the example of 101, there are 3 changes in prices happening, these three price changes and the corresponding items change should be the new dataframe.

1) price change of 5 (5 -> 10 (or 10 -> 5)) resulted in 5 item change (10 -> 15 (or 15 -> 10)) so corresponding row would be pricechange =5, itemschange=5

2) price change of 7 (5 -> 12 (or 12 -> 5)) resulted in 10 item change (10 -> 20 (or 20 -> 10)) so corresponding row would be pricechange =7, itemschange=10

3) price change of 2 (10 -> 12 (or 12 -> 10) ) resulted in 5 item change (15 -> 20 (or 20 -> 15)) so corresponding row would be pricechange =2, itemschange=5

The dataframe would be transformed to:

| id    | pricechange  | itemschange | 

| 101   |  5           | 5           |   
| 101   |  7           | 10          | 
| 101   |  2           | 5           | 
| 102   |  2           | 6           |

Upvotes: 1

Views: 187

Answers (2)

pault
pault

Reputation: 43504

You can simply do an inner join of the DataFrame with itself on the id column. To avoid duplicate records, define a where clause that requires the left DataFrame's price to be greater than that of the right DataFrame.

After the join, select the desired columns:

from pyspark.sql.functions import col

df.alias("r").join(df.alias("l"), on="id")\
    .where("l.price > r.price")\
    .select(
        "id",
        (col("l.price") - col("r.price")).alias("pricechange"),
        (col("l.item") - col("r.item")).alias("itemschange"),
    ).show()
#+---+-----------+-----------+
#| id|pricechange|itemschange|
#+---+-----------+-----------+
#|101|          2|          5|
#|101|          7|         10|
#|101|          5|          5|
#|102|          2|          6|
#+---+-----------+-----------+

This will be more efficient than using a Window.

Upvotes: 1

Shantanu Sharma
Shantanu Sharma

Reputation: 4089

Here is the detailed approach you can follow -

Define Schema and prepare data for DF

 df = spark.createDataFrame([
            (101,5,10),
            (101,10,15),
            (101,12,20),
            (102,1,1),    
            (102,3,7)
        ],'id : int, price : int, item: int')

Create dummy column rank to compare each id with all other records of same id

from pyspark.sql.window import Window
from pyspark.sql.functions import *

windowSpec = Window.partitionBy('id').orderBy('id')
rank = row_number().over(windowSpec).alias('rank')

df = df.withColumn("rank", rank)

Final logic - Join and Filter

df.alias('a').\
    join(df.alias('b'),on='id').\
    where('a.rank < b.rank').\
    selectExpr("a.id as id","b.price - a.price as price","b.item - a.item as item").\
    show()

IMHO - It's always better to post what you tried so far and what error/issue your are facing along with the question. This help to get quick and better response.

Upvotes: 1

Related Questions