Rahul Patidar
Rahul Patidar

Reputation: 209

Taking sum with filter condition in pyspark

I'm doing some aggregation in pyspark dataframes. I which I need to check PAYMNT_STATUS column and based on that I need to take the sum of different columns and need to assign in a new column. also, I am taking count of distinct state and assigning it in a new column.

I am writing my pyspark query as below:

   result=df1.groupby(countrry,year).withColumn('Amt1',sf.when(sf.col('PAYMNT_STATUS')==='A1',
   sf.sum('Amt1_RS'))).withColumn("Amt2",sf.when(sf.col("PAYMNT_STATUS")==="B1",
   sf.sum("Amt2_RS"))).agg(sf.sum("amt3").alias("amt_rs"))
    .agg(countDistinct("state")).alias("state_count")

But getting a syntax error. Can someone please guide me? How should I rewrite this code?

Equivalent SQL:

    SUM(DECODE(PAYMNT_STATUS, 'A1', NVL(Amt1_RS, 0))) AS Amt1,
        SUM(DECODE(PAYMNT_STATUS, 'B1', NVL(Amt2_RS, 0))) AS Amt2,
        SUM(ISNULL(amt3, 0)) AS amt_rs,
        COUNT(DISTINCTstate) as state_count

As of now I'm not considering this NVL and ISNULL part.

Upvotes: 0

Views: 2171

Answers (1)

Sravya
Sravya

Reputation: 21

You can try something like this

String filteringCondition = "PAYMNT_STATUS==Al" (could be extended to many)
String filteringCondition2 = "PAYMNT_STATUS==Bl" (could be extended to many)


result=df1.groupby(countrry,year).withColumn('Amt1',sum(when(expr(filteringCondition),col('Amt1_RS')).otherwise(0))
.withColumn("Amt2", sum(when(expr(filteringCondition2), col("Amt2_RS")).otherwise(0))

Upvotes: 1

Related Questions