Reputation: 2421
I'm trying to group and sum for a PySpark (2.4) Dataframe but can't only get values one by one.
I've the following dataframe :
data.groupBy("card_scheme", "failed").count().show()
+----------------+------+------+
| card_Scheme|failed| count|
+----------------+------+------+
| jcb| false| 4|
|american express| false| 22084|
| AMEX| false| 4|
| mastercard| true| 1122|
| visa| true| 1975|
| visa| false|126372|
| CB| false| 6|
| discover| false| 2219|
| maestro| false| 2|
| VISA| false| 13|
| mastercard| false| 40856|
| MASTERCARD| false| 9|
+----------------+------+------+
I'm trying to calculate the formula X = false / (false + true)
for each card_scheme and still get one dataframe in the end.
I'm expecting something like:
| card_scheme | X |
|-------------|---|
| jcb | 1 |
| .... | . |
| visa | 0.9846| (which is 126372 / (126372 + 1975)
| ... | . |
Upvotes: 3
Views: 1355
Reputation: 561
First split root dataframe into two dataframes:
df_true = data.filter(data.failed == True).alias("df1")
df_false =data.filter(data.failed == False).alias("df2")
Then doing full outer join we can get final result:
from pyspark.sql.functions import col,when
df_result = df_true.join(df_false,df_true.card_scheme == df_false.card_scheme, "outer") \
.select(when(col("df1.card_scheme").isNotNull(), col("df1.card_scheme")).otherwise(col("df2.card_scheme")).alias("card_scheme") \
, when(col("df1.failed").isNotNull(), (col("df2.count")/(col("df1.count") + col("df2.count")))).otherwise(1).alias("X"))
No need to do groupby
, just extra two dataframes and joining.
Upvotes: 2
Reputation: 1564
from pyspark.sql import functions as func
from pyspark.sql.functions import col
data = data.groupby("card_scheme", "failed").count()
Create 2 new dataframes:
a = data.filter(col("failed") == "false").groupby("card_scheme").agg(func.sum("count").alias("num"))
b = data.groupby("card_scheme").agg(func.sum("count").alias("den"))
Join both the dataframes:
c = a.join(b, a.card_scheme == b.card_scheme).drop(b.card_scheme)
Divide one column with another:
c.withColumn('X', c.num/c.den)
Upvotes: 1
Reputation: 7597
Creating the dataset
myValues = [('jcb',False,4),('american express', False, 22084),('AMEX',False,4),('mastercard',True,1122),('visa',True,1975),('visa',False,126372),('CB',False,6),('discover',False,2219),('maestro',False,2),('VISA',False,13),('mastercard',False,40856),('MASTERCARD',False,9)]
df = sqlContext.createDataFrame(myValues,['card_Scheme','failed','count'])
df.show()
+----------------+------+------+
| card_Scheme|failed| count|
+----------------+------+------+
| jcb| false| 4|
|american express| false| 22084|
| AMEX| false| 4|
| mastercard| true| 1122|
| visa| true| 1975|
| visa| false|126372|
| CB| false| 6|
| discover| false| 2219|
| maestro| false| 2|
| VISA| false| 13|
| mastercard| false| 40856|
| MASTERCARD| false| 9|
+----------------+------+------+
Method 1: This method will be slower, as it involves a traspose via pivot
.
df=df.groupBy("card_Scheme").pivot("failed").sum("count")
df=df.withColumn('X',when((col('True').isNotNull()),(col('false')/(col('false')+col('true')))).otherwise(1))
df=df.select('card_Scheme','X')
df.show()
+----------------+------------------+
| card_Scheme| X|
+----------------+------------------+
| VISA| 1.0|
| jcb| 1.0|
| MASTERCARD| 1.0|
| maestro| 1.0|
| AMEX| 1.0|
| mastercard|0.9732717137548239|
|american express| 1.0|
| CB| 1.0|
| discover| 1.0|
| visa|0.9846120283294506|
+----------------+------------------+
Method 2: Use SQL - you can do so the via windows
function. This will be a lot faster.
from pyspark.sql.window import Window
df = df.groupBy("card_scheme", "failed").agg(sum("count"))\
.withColumn("X", col("sum(count)")/sum("sum(count)").over(Window.partitionBy(col('card_scheme'))))\
.where(col('failed')== False).drop('failed','sum(count)')
df.show()
+----------------+------------------+
| card_scheme| X|
+----------------+------------------+
| VISA| 1.0|
| jcb| 1.0|
| MASTERCARD| 1.0|
| maestro| 1.0|
| AMEX| 1.0|
| mastercard|0.9732717137548239|
|american express| 1.0|
| CB| 1.0|
| discover| 1.0|
| visa|0.9846120283294506|
+----------------+------------------+
Upvotes: 3
Reputation: 13001
A simple solution would be to do a second groupby:
val grouped_df = data.groupBy("card_scheme", "failed").count() // your dataframe
val with_countFalse = grouped_df.withColumn("countfalse", when($"failed" === "false", $"count").otherwise(lit(0)))
with_countFalse.groupBy("card_scheme").agg(when($"failed" === "false", $"count").otherwise(lit(0)))) / sum($"count")).show()
The idea is that you can create a second column which has the failed in the failed=false and 0 otherwise. This means that the sum of the count column gives you false + true while the sum of the countfalse gives just the false. Then simply do a second groupby
Note: Some of the other answers use pivot. I believe the pivot solution would be slower (it does more), however, if you do choose to use it, add the specific values to the pivot call, i.e. pivot("failed", ["true", "false"]) to improve performance, otherwise spark would have to do a two path (the first to find the values)
Upvotes: 1
Reputation: 1020
data.groupBy("card_scheme").pivot("failed").agg(count("card_scheme"))
should work. I am not sure about the agg(count(any_column))
, but the clue is pivot
function. In result you'll get two new columns: false
and true
. Then you can easily calculate the x = false / (false + true)
.
Upvotes: 1