Kamil Sindi
Kamil Sindi

Reputation: 22832

Get first non-null values in group by (Spark 1.6)

How can I get the first non-null values from a group by? I tried using first with coalesce F.first(F.coalesce("code")) but I don't get the desired behaviour (I seem to get the first row).

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

I tried:

(df
  .groupby("id")
  .agg(F.first(F.coalesce("code")),
       F.first(F.coalesce("name")))
  .collect())

DESIRED OUTPUT

[Row(id='a', code='code1', name='name2')]

Upvotes: 11

Views: 28560

Answers (3)

Abdennacer Lachiheb
Abdennacer Lachiheb

Reputation: 4888

The first method accept an argument ignorenulls, that can be set to true,

Python:

df.groupby("id").agg(first(col("code"), ignorenulls=True).alias("code"))

Scala:

df.groupBy("id").agg(first(col("code"), ignoreNulls = true).alias("code"))

Upvotes: 4

Kamil Sindi
Kamil Sindi

Reputation: 22832

Because I only had one non-null value for every grouping, using min / max in 1.6 worked for my purposes:

(df
  .groupby("id")
  .agg(F.min("code"),
       F.min("name"))
  .show())

+---+---------+---------+
| id|min(code)|min(name)|
+---+---------+---------+
|  a|    code1|    name2|
+---+---------+---------+

Upvotes: 5

Daniel de Paula
Daniel de Paula

Reputation: 17872

For Spark 1.3 - 1.5, this could do the trick:

from pyspark.sql import functions as F
df.groupBy(df['id']).agg(F.first(df['code']), F.first(df['name'])).show()

+---+-----------+-----------+
| id|FIRST(code)|FIRST(name)|
+---+-----------+-----------+
|  a|      code1|      name2|
+---+-----------+-----------+

Edit

Apparently, in version 1.6 they have changed the way the first aggregate function is processed. Now, the underlying class First should be constructed with a second argument ignoreNullsExpr parameter, which is not yet used by the first aggregate function (as can bee seen here). However, in Spark 2.0 it will be able to call agg(F.first(col, True)) to ignore nulls (as can be checked here).

Therefore, for Spark 1.6 the approach must be different and a little more inefficient, unfornately. One idea is the following:

from pyspark.sql import functions as F
df1 = df.select('id', 'code').filter(df['code'].isNotNull()).groupBy(df['id']).agg(F.first(df['code']))
df2 = df.select('id', 'name').filter(df['name'].isNotNull()).groupBy(df['id']).agg(F.first(df['name']))
result = df1.join(df2, 'id')
result.show()

+---+-------------+-------------+
| id|first(code)()|first(name)()|
+---+-------------+-------------+
|  a|        code1|        name2|
+---+-------------+-------------+

Maybe there is a better option. I'll edit the answer if I find one.

Upvotes: 24

Related Questions