Pdeuxa
Pdeuxa

Reputation: 699

pandas groupby.apply to pyspark

I have the following custom function to make aggregation in my pandas dataframe, and I want to do the same things in pyspark:

def custom_aggregation_pyspark(x,queries):
    names={}
    for k, v in regles_calcul.items():
        plus = x.query(v["plus_credit"])['OBNETCRE'].sum() + x.query(v["plus_debit"])['OBNETDEB'].sum()
        minus = x.query(v["minus_credit"])['OBNETCRE'].sum() + x.query(v["minus_debit"])['OBNETDEB'].sum()  

     

        names[k]= plus-minus
    return pd.Series(names, index=list(names.keys()))


df = df.groupby(['LBUDG']).apply(custom_aggregation_pandas, queries ).sum()

were queries is a dictionary of query like

{'first_queries': {   
    'plus_credit': "classe_compte_rg2 in ('237', '238')",
    'plus_debit': "classe_compte_rg2 in ('237', '238')", 
    'minus_credit': "classe_compte_rg2 in ('237', '238')", 
    'minus_debit': "classe_compte_rg1 in ('20', '21', '23')"
     }
}

So, I replaced the pandas "query" by pyspark 'sql'

def custom_aggregation_pyspark(x,queries):
    x.createOrReplaceTempView("df")
    names={}
    for k , v in queries.items():
        plus = spark.sql("SELECT * FROM df WHERE "+ v["plus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["plus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect() 
        minus= spark.sql("SELECT * FROM df WHERE "+ v["minus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["minus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect() 
        names[k]= plus-minus
    return pd.Series(names, index=list(names.keys()))

df.groupby("LBUDG").agg(custom_aggregation_pyspark(df,queries))

I am certainly going in the wrong direction as the above code do not work, could you please guide me where I should look at?

The desired output is a table grouped by LBUDG (a string) and other columns use custom aggregation functions.

Edit Dataframe Sample:

LBUDG OBNETCRE OBNETDEB classe_compte_rg0 classe_compte_rg1
LE POIZAT 0,00 0,00 1 10
LE POIZAT 67572,00 0,00 1 10
LE POIZAT 0,00 0,00 1 10
LE POIZAT 4908,12 0,00 1 10
LE POIZAT 0,00 0,00 1 10
DAFOUR 295240,67 0,00 1 10
LE POIZAT 0,00 0,00 1 11
LE POIZAT 0,00 0,00 1 12
LE POIZAT 0,00 0,00 1 13
LE POIZAT 0,00 0,00 1 13
LE POIZAT 53697,94 0,00 1 13

Expected output:

LBUDG AGG1 AGG2
LE POIZAT agg1_value for LE POIZAT ...
DAFOUR .... ...

where agg1 corresponds (for example), to the sum of OBNETCRE - OBNETDEB, where classe_compte_rg1 has value, is 10 or 11.

Upvotes: 0

Views: 314

Answers (1)

blackbishop
blackbishop

Reputation: 32700

You can use epxr to evaluate the conditions passed in the queries dict and use a conditional aggregation to calculate the sum. Here's an example that is equivalent to the one you gave in pandas :

from pyspark.sql import functions as F


def custom_aggregation_pyspark(df, regles_calcul):
    df1 = df.groupBy("LBUDG") \
        .agg(
        *[
            ((F.sum(F.when(F.expr(v["plus_credit"]), F.col("OBNETCRE")).otherwise(0)) +
              F.sum(F.when(F.expr(v["plus_debit"]), F.col("OBNETDEB")).otherwise(0))) -
             (F.sum(F.when(F.expr(v["minus_credit"]), F.col("OBNETCRE")).otherwise(0)) +
              F.sum(F.when(F.expr(v["minus_debit"]), F.col("OBNETDEB")).otherwise(0)))
             ).alias(k)

            for k, v in regles_calcul.items()
        ]
    )

    return df1


df = custom_aggregation_pyspark(df, queries)

Upvotes: 1

Related Questions