danimille
danimille

Reputation: 350

PySpark Error using partition over a ranked column

I have two Spark Dataframes, the first one contains information related to Events as follows:

Id User_id Date
1 1 2021-08-15
2 2 2020-03-10

The second Dataframe contains information related to previous Purchase as below:

Id User_id Date
1 1 2021-07-15
2 1 2021-07-10
3 1 2021-04-12
4 2 2020-02-10

What I wondering to know is how to bring the quantity of purchase for each User 90 days prior to Event Date.

The code I'm using is:

    (events.join(purchase,
         on = [events.User_id == purchase.User_id,
               events.Date >= purchase.Date],
         how = "left")
   .withColumn('rank_test', rank().over(W.partitionBy(purchase['User_id']).orderBy(col("Date").desc())))
   .withColumn('is90days', when(floor((events["Date"].cast('long') - purchase["Date"].cast('long'))/86400) <= 90, 1).otherwise(0))
   .where(col('is90days') == 1)
   .withColumn('maxPurchase', max('rank_test').over(W.partitionBy(events['ID'])))
   .where(col('rank_test') == col('maxPurchase'))
)

But I'm getting the following error:

AttributeError: 'str' object has no attribute 'over'

What I was expecting is a table as follows:

Id User_id Date qtyPurchasePast90days
1 1 2021-08-15 2
2 2 2020-03-10 1

I appreciate your time in helping me! Regards

Upvotes: 0

Views: 353

Answers (1)

pltc
pltc

Reputation: 6082

In your code (line 8), Spark is confusing between Python built-in function max and Spark function max. And the main reason is that you (and many many others) import Spark functions in a not recommended way

# DON'T do this
from pyspark.sql.functions import *
max('rank_test') # 't'

# DO this
from pyspark.sql import functions as F
F.max('rank_test') # Column<'max(rank_test)'>

Upvotes: 3

Related Questions