Reputation: 699
I have the following custom function to make aggregation in my pandas dataframe, and I want to do the same things in pyspark:
def custom_aggregation_pyspark(x,queries):
names={}
for k, v in regles_calcul.items():
plus = x.query(v["plus_credit"])['OBNETCRE'].sum() + x.query(v["plus_debit"])['OBNETDEB'].sum()
minus = x.query(v["minus_credit"])['OBNETCRE'].sum() + x.query(v["minus_debit"])['OBNETDEB'].sum()
names[k]= plus-minus
return pd.Series(names, index=list(names.keys()))
df = df.groupby(['LBUDG']).apply(custom_aggregation_pandas, queries ).sum()
were queries is a dictionary of query like
{'first_queries': {
'plus_credit': "classe_compte_rg2 in ('237', '238')",
'plus_debit': "classe_compte_rg2 in ('237', '238')",
'minus_credit': "classe_compte_rg2 in ('237', '238')",
'minus_debit': "classe_compte_rg1 in ('20', '21', '23')"
}
}
So, I replaced the pandas "query" by pyspark 'sql'
def custom_aggregation_pyspark(x,queries):
x.createOrReplaceTempView("df")
names={}
for k , v in queries.items():
plus = spark.sql("SELECT * FROM df WHERE "+ v["plus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["plus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect()
minus= spark.sql("SELECT * FROM df WHERE "+ v["minus_credit"]).select('OBNETCRE').groupby('OBNETCRE').sum().collect() + spark.sql("SELECT * FROM df WHERE "+ v["minus_debit"]).select('OBNETDEB').groupby('OBNETDEB').sum().collect()
names[k]= plus-minus
return pd.Series(names, index=list(names.keys()))
df.groupby("LBUDG").agg(custom_aggregation_pyspark(df,queries))
I am certainly going in the wrong direction as the above code do not work, could you please guide me where I should look at?
The desired output is a table grouped by LBUDG
(a string) and other columns use custom aggregation functions.
Edit Dataframe Sample:
LBUDG | OBNETCRE | OBNETDEB | classe_compte_rg0 | classe_compte_rg1 |
---|---|---|---|---|
LE POIZAT | 0,00 | 0,00 | 1 | 10 |
LE POIZAT | 67572,00 | 0,00 | 1 | 10 |
LE POIZAT | 0,00 | 0,00 | 1 | 10 |
LE POIZAT | 4908,12 | 0,00 | 1 | 10 |
LE POIZAT | 0,00 | 0,00 | 1 | 10 |
DAFOUR | 295240,67 | 0,00 | 1 | 10 |
LE POIZAT | 0,00 | 0,00 | 1 | 11 |
LE POIZAT | 0,00 | 0,00 | 1 | 12 |
LE POIZAT | 0,00 | 0,00 | 1 | 13 |
LE POIZAT | 0,00 | 0,00 | 1 | 13 |
LE POIZAT | 53697,94 | 0,00 | 1 | 13 |
Expected output:
LBUDG | AGG1 | AGG2 |
---|---|---|
LE POIZAT | agg1_value for LE POIZAT | ... |
DAFOUR | .... | ... |
where agg1 corresponds (for example), to the sum of OBNETCRE - OBNETDEB
, where classe_compte_rg1
has value, is 10 or 11.
Upvotes: 0
Views: 314
Reputation: 32700
You can use epxr
to evaluate the conditions passed in the queries
dict and use a conditional aggregation to calculate the sum. Here's an example that is equivalent to the one you gave in pandas :
from pyspark.sql import functions as F
def custom_aggregation_pyspark(df, regles_calcul):
df1 = df.groupBy("LBUDG") \
.agg(
*[
((F.sum(F.when(F.expr(v["plus_credit"]), F.col("OBNETCRE")).otherwise(0)) +
F.sum(F.when(F.expr(v["plus_debit"]), F.col("OBNETDEB")).otherwise(0))) -
(F.sum(F.when(F.expr(v["minus_credit"]), F.col("OBNETCRE")).otherwise(0)) +
F.sum(F.when(F.expr(v["minus_debit"]), F.col("OBNETDEB")).otherwise(0)))
).alias(k)
for k, v in regles_calcul.items()
]
)
return df1
df = custom_aggregation_pyspark(df, queries)
Upvotes: 1