toddysm
toddysm

Reputation: 638

Filtering rows with empty arrays in PySpark

We are trying to filter rows that contain empty arrays in a field using PySpark. Here is the schema of the DF:

root
 |-- created_at: timestamp (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- text: string (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)

We are trying two approaches.

First, defining UDF that can modify the rows like this

empty_array_to_null = udf(lambda arr: None if len(arr) == 0 else arr, ArrayType(StructType()))

and use it to exclude the rows in df.select(empty_array_to_null(df.user_mentions)).

The other approach is to have the following UDF:

is_empty = udf(lambda x: len(x) == 0, BooleanType())

and use it in df.filter(is_empty(df.user_mentions))

Both approaches throw errors. First approach yields the following:

An error occurred while calling o3061.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1603.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1603.0 (TID 41390, 10.0.0.11): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 0 fields are required while 5 values are provided.
at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$fromJava$1.apply(EvaluatePython.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)

Second approach throws the following:

Some of types cannot be determined by the first 100 rows, please try again with sampling
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 57, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 522, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 347, in _inferSchema
    raise ValueError("Some of types cannot be determined by the "
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
...

Update: Added sample data...

+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|          created_at|   screen_name|                text|retweet_count|favorite_count|in_reply_to_status_id|in_reply_to_user_id|in_reply_to_screen_name|user_mentions|            hashtags|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|2017-03-13 23:00:...|  danielmellen|#DevOps understan...|            0|             0|                 null|               null|                   null|           []|            [devops]|
|2017-03-13 23:00:...|     RebacaInc|Automation of ent...|            0|             0|                 null|               null|                   null|           []|[googlecloud, orc...|
|2017-03-13 23:00:...| CMMIAppraiser|Get your Professi...|            0|             0|                 null|               null|                   null|           []|        [broadsword]|
|2017-03-13 23:00:...|       usxtron|and when the syst...|            0|             0|                 null|               null|                   null|           []|             [cloud]|
|2017-03-13 23:00:...|     SearchCRM|.#Automation and ...|            0|             0|                 null|               null|                   null|           []|[automation, chat...|
|2017-03-13 23:00:...|  careers_tech|SummitSync - Juni...|            0|             0|                 null|               null|                   null|           []|[junior, cloud, e...|
|2017-03-13 23:00:...|    roy_lauzon|Both the #DevOps ...|            0|             0|                 null|               null|                   null|           []|[devops, cybersec...|
|2017-03-13 23:00:...|      nosqlgal|Introducing #Couc...|            0|             0|                 null|               null|                   null|           []|  [couchbase, nosql]|
|2017-03-13 23:00:...|  jordanfarrer|Ran into a weird ...|            0|             0|                 null|               null|                   null|           []|            [docker]|
|2017-03-13 23:00:...|    BGrieveSTL|#purestorage + #a...|            0|             0|                 null|               null|                   null|           []|[purestorage, azure]|
|2017-03-13 23:00:...| Hotelbeds_API|"How to Quickly O...|            0|             0|                 null|               null|                   null|           []|       [api, feedly]|
|2017-03-13 23:00:...|  ScalaWilliam|Principles behind...|            0|             0|                 null|               null|                   null|           []|             [agile]|
|2017-03-13 23:00:...|   PRFT_Oracle|[On-Demand Webina...|            0|             0|                 null|               null|                   null|           []|             [cloud]|
|2017-03-13 23:00:...|    PDF_filler|Now you can #secu...|            0|             0|                 null|               null|                   null|           []|[secure, data, ap...|
|2017-03-13 23:00:...|lgoncalves1979|10 Mistakes We Ma...|            0|             0|                 null|               null|                   null|           []|[coaching, scrumm...|
|2017-03-13 23:00:...|       Jelecos|Vanguard CIO: Why...|            0|             0|                 null|               null|                   null|           []|[microservices, cio]|
|2017-03-13 23:00:...|   DJGaryBaldy|Why bother with W...|            0|             0|                 null|               null|                   null|           []|        [automation]|
|2017-03-13 23:00:...|     1codeblog|Apigee Edge Produ...|            0|             0|                 null|               null|                   null|           []|[cloud, next17, g...|
|2017-03-13 23:00:...|     CloudRank|Why and when shou...|            0|             0|                 null|               null|                   null|           []|[machinelearning,...|
|2017-03-13 23:00:...|  forgeaheadio|5 essentials for ...|            0|             0|                 null|               null|                   null|           []|[hybrid, cloud, h...|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
only showing top 20 rows

Upvotes: 19

Views: 32236

Answers (3)

travelingbones
travelingbones

Reputation: 8448

df[ df.user_mentions != F.array() ]

To see why this works, note that df.user_mentions != F.array() is a column object with boolean entries, so passing it to df filters df on the rows that have nonempty array in the user_mentions column.

Upvotes: 0

matthiash
matthiash

Reputation: 3243

array() creates an empty array that can be compared against.

df = spark.createDataFrame([
  ["ABC", ["a", "b"]],
  ["DEF", []],
  ["GHI", ["c"]],
  ["JKL", []]
], ["name", "user_mentions"])

import pyspark.sql.functions as F

df_with = df.filter(F.col("user_mentions")!=F.array())
df_without = df.filter(F.col("user_mentions")==F.array())

Upvotes: 5

Sophie D.
Sophie D.

Reputation: 381

One of the way is to first get the size of your array, and then filter on the rows which array size is 0. I have found the solution here How to convert empty arrays to nulls?.

import pyspark.sql.functions as F
df = df.withColumn("size", F.size(F.col(user_mentions)))
df_filtered = df.filter(F.col("size") >= 1)

Upvotes: 32

Related Questions