Reputation: 22832
How can I get the first non-null values from a group by? I tried using first with coalesce F.first(F.coalesce("code"))
but I don't get the desired behaviour (I seem to get the first row).
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
sc = SparkContext("local")
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])
I tried:
(df
.groupby("id")
.agg(F.first(F.coalesce("code")),
F.first(F.coalesce("name")))
.collect())
DESIRED OUTPUT
[Row(id='a', code='code1', name='name2')]
Upvotes: 11
Views: 28560
Reputation: 4888
The first method accept an argument ignorenulls, that can be set to true,
Python:
df.groupby("id").agg(first(col("code"), ignorenulls=True).alias("code"))
Scala:
df.groupBy("id").agg(first(col("code"), ignoreNulls = true).alias("code"))
Upvotes: 4
Reputation: 22832
Because I only had one non-null value for every grouping, using min / max in 1.6 worked for my purposes:
(df
.groupby("id")
.agg(F.min("code"),
F.min("name"))
.show())
+---+---------+---------+
| id|min(code)|min(name)|
+---+---------+---------+
| a| code1| name2|
+---+---------+---------+
Upvotes: 5
Reputation: 17872
For Spark 1.3 - 1.5, this could do the trick:
from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()
+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
| a| code1| name2|
+---+-----------+-----------+
Edit
Apparently, in version 1.6 they have changed the way the first
aggregate function is processed. Now, the underlying class First
should be constructed with a second argument ignoreNullsExpr
parameter, which is not yet used by the first
aggregate function (as can bee seen here). However, in Spark 2.0 it will be able to call agg(F.first(col, True))
to ignore nulls (as can be checked here).
Therefore, for Spark 1.6 the approach must be different and a little more inefficient, unfornately. One idea is the following:
from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()
+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
| a| code1| name2|
+---+-------------+-------------+
Maybe there is a better option. I'll edit the answer if I find one.
Upvotes: 24