Reputation: 1185
I have a very large dataset that is loaded in Hive (about 1.9 million rows and 1450 columns). I need to determine the "coverage" of each of the columns, meaning, the fraction of rows that have non-NaN values for each column.
Here is my code:
from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string
sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()
covgs = sc.parallelize(df.columns)
.map(lambda x: str(x))
.map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Trying this out in PySpark shell, if I then do covgs.take(10)
, it returns a rather large error stack. It says that there's a problem in save in the file /usr/lib64/python2.6/pickle.py
. This is the final part of the error:
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Is there a better way to accomplish this? I can't use pandas, though, as it's not currently available on the cluster I work on and I don't have access rights to install it.
Upvotes: 44
Views: 45607
Reputation: 24478
For string and numeric columns, summary
is convenient.
Count non-nulls:
df.summary("count").show()
Count non-NaN:
df.replace(float("nan"), None).summary("count").show()
Note. summary
would not return columns of other than string or numeric type (e.g. date type columns would be omitted from the result).
Full test:
df = spark.createDataFrame(
[(0.0, 1, 2, float("Nan")),
(None, 3, 4, 5.0),
(None, None, 6, 7.0),
(float("Nan"), 8, 9, 7.0)],
["v", "x", "y", "z"])
df.show()
# +----+----+---+---+
# | v| x| y| z|
# +----+----+---+---+
# | 0.0| 1| 2|NaN|
# |null| 3| 4|5.0|
# |null|null| 6|7.0|
# | NaN| 8| 9|7.0|
# +----+----+---+---+
df.summary("count").show()
# +-------+---+---+---+---+
# |summary| v| x| y| z|
# +-------+---+---+---+---+
# | count| 2| 3| 4| 4|
# +-------+---+---+---+---+
df.replace(float("nan"), None).summary("count").show()
# +-------+---+---+---+---+
# |summary| v| x| y| z|
# +-------+---+---+---+---+
# | count| 1| 3| 4| 3|
# +-------+---+---+---+---+
Upvotes: 0
Reputation: 71
from pyspark.sql import functions as F
z = df.count()
(df.replace(float('nan'), None)
.agg(*[F.expr(f'count({col})/{z} as {col}') for col in df.columns])
).show()
Upvotes: 0
Reputation: 13778
You may got data type mismatch
Exception :
org.apache.spark.sql.AnalysisException: cannot resolve 'isnan(`date_hour`)' due to data type mismatch: argument 1 requires (double or float) type, however, '`date_hour`' is of timestamp type.;
Better select numerical columns at first:
from pyspark.sql.functions import *
def get_numerical_cols(df):
return [i.name for i in df.schema if str(i.dataType) in ('IntegerType', 'LongType', 'FloatType', 'DoubleType') ]
numcols = get_numerical_cols(df)
df_nan_rate = df.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in numcols])
Upvotes: 0
Reputation: 2421
You can use isNotNull()
:
df.where(df[YOUR_COLUMN].isNotNull()).select(YOUR_COLUMN).show()
Upvotes: 0
Reputation: 330353
Let's start with a dummy data:
from pyspark.sql import Row
row = Row("v", "x", "y", "z")
df = sc.parallelize([
row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0),
row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN"))
]).toDF()
## +----+----+---+---+
## | v| x| y| z|
## +----+----+---+---+
## | 0.0| 1| 2|3.0|
## |null| 3| 4|5.0|
## |null|null| 6|7.0|
## | NaN| 8| 9|NaN|
## +----+----+---+---+
All you need is a simple aggregation:
from pyspark.sql.functions import col, count, isnan, lit, sum
def count_not_null(c, nan_as_null=False):
"""Use conversion between boolean and integer
- False -> 0
- True -> 1
"""
pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
return sum(pred.cast("integer")).alias(c)
df.agg(*[count_not_null(c) for c in df.columns]).show()
## +---+---+---+---+
## | v| x| y| z|
## +---+---+---+---+
## | 2| 3| 4| 4|
## +---+---+---+---+
or if you want to treat NaN
a NULL
:
df.agg(*[count_not_null(c, True) for c in df.columns]).show()
## +---+---+---+---+
## | v| x| y| z|
## +---+---+---+---+
## | 1| 3| 4| 3|
## +---+---+---+---
You can also leverage SQL NULL
semantics to achieve the same result without creating a custom function:
df.agg(*[
count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs
for c in df.columns
]).show()
## +---+---+---+
## | x| y| z|
## +---+---+---+
## | 1| 2| 3|
## +---+---+---+
but this won't work with NaNs
.
If you prefer fractions:
exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns]
df.agg(*exprs).show()
## +------------------+------------------+---+
## | x| y| z|
## +------------------+------------------+---+
## |0.3333333333333333|0.6666666666666666|1.0|
## +------------------+------------------+---+
or
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show()
## +------------------+------------------+---+
## | x| y| z|
## +------------------+------------------+---+
## |0.3333333333333333|0.6666666666666666|1.0|
## +------------------+------------------+---+
Scala equivalent:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, isnan, sum}
type JDouble = java.lang.Double
val df = Seq[(JDouble, JDouble, JDouble, JDouble)](
(0.0, 1, 2, 3.0), (null, 3, 4, 5.0),
(null, null, 6, 7.0), (java.lang.Double.NaN, 8, 9, java.lang.Double.NaN)
).toDF()
def count_not_null(c: Column, nanAsNull: Boolean = false) = {
val pred = c.isNotNull and (if (nanAsNull) not(isnan(c)) else lit(true))
sum(pred.cast("integer"))
}
df.select(df.columns map (c => count_not_null(col(c)).alias(c)): _*).show
// +---+---+---+---+
// | _1| _2| _3| _4|
// +---+---+---+---+
// | 2| 3| 4| 4|
// +---+---+---+---+
df.select(df.columns map (c => count_not_null(col(c), true).alias(c)): _*).show
// +---+---+---+---+
// | _1| _2| _3| _4|
// +---+---+---+---+
// | 1| 3| 4| 3|
// +---+---+---+---+
Upvotes: 90