Supriya
Supriya

Reputation: 51

Filter array of struct in spark dataframe

I have a JSON file that I am reading into Spark dataframe using Scala 2.10 with

val df = sqlContext.read.json("file_path")

JSON looks like below:

{ "data": [{ "id":"20180218","parent": [{"name": "Market"}]}, { "id":"20180219","parent": [{"name": "Client"},{"name": "Market" }]}, { "id":"20180220","parent": [{"name": "Client"}]},{ "id":"20180221","parent": []}]}

data is an array of struct. Each struct again has parent key. Parent is again an array of struct which can hold 0 or more values.

I need to filter the parent array such that it holds only the structs that have name either "Market" or nothing. My output should look like:

{ "data": [{ "id":"20180218","parent": [{"name": "Market"}]}, { "id":"20180219","parent": [{"name": "Market" }]}, { "id":"20180220","parent": []},{ "id":"20180221","parent": []}]}

So, basically filter out every struct that has name anything other than "Market" and keep the empty parent array (as a result of the operation, or if it was already empty).

Can somebody help out here?

Thanks

Upvotes: 5

Views: 20823

Answers (2)

elghoto
elghoto

Reputation: 303

Assuming you have a column with an array of structs, in your case parent, what you need to do is to use the function filter. I believe some people already said that. The trick is that the filtering function need to work on a struct.

According to the documentation https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html#apply(extraction:Any):org.apache.spark.sql.Column you can

Extracts a value or values from a complex type. The following types of extraction are supported:

  • Given an Array, an integer ordinal can be used to retrieve a single value.
  • Given a Map, a key of the correct type can be used to retrieve an individual value.
  • Given a Struct, a string fieldName can be used to extract that field.
  • Given an Array of Structs, a string fieldName can be used to extract filed of every struct in that array, and return an Array of fields.

Therefore filtering is as simple as:

df.withColumn("filtered", filter(col("parent"), (c: Column) => c.apply("name") === "Market")

I believe this is the most efficient and clean way.

Upvotes: 2

Pavithran Ramachandran
Pavithran Ramachandran

Reputation: 993

We need to use explode function to achieve this sort of nested JSON struct and array queries.

scala> val df = spark.read.json("/Users/pavithranrao/Desktop/test.json")

scala> df.printSchema
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)

scala> val oneDF = df.select(col("data"), explode(col("data"))).toDF("data", "element").select(col("data"), col("element.parent"))
scala> oneDF.show
"""
+--------------------+--------------------+
|                data|              parent|
+--------------------+--------------------+
|[[20180218,Wrappe...|          [[Market]]|
|[[20180218,Wrappe...|[[Client], [Market]]|
|[[20180218,Wrappe...|          [[Client]]|
|[[20180218,Wrappe...|                  []|
+--------------------+--------------------+
"""

scala> val twoDF = oneDF.select(col("data"), explode(col("parent"))).toDF("data", "names")
scala> twoDF.printSchema
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)
 |-- names: struct (nullable = true)
 |    |-- name: string (nullable = true)

scala> twoDF.show
"""
+--------------------+--------+
|                data|   names|
+--------------------+--------+
|[[20180218,Wrappe...|[Market]|
|[[20180218,Wrappe...|[Client]|
|[[20180218,Wrappe...|[Market]|
|[[20180218,Wrappe...|[Client]|
+--------------------+--------+
"""

scala> import org.apache.spark.sql.functions.length

// Extract names struct that is Empty
scala> twoDF.select(length(col("names.name")) === 0).show
+------------------------+
|(length(names.name) = 0)|
+------------------------+
|                   false|
|                   false|
|                   false|
|                   false|
+------------------------+

// Extract names strcut that doesn't have Market
scala> twoDF.select(!col("names.name").contains("Market")).show()
+----------------------------------+
|(NOT contains(names.name, Market))|
+----------------------------------+
|                             false|
|                              true|
|                             false|
|                              true|
+----------------------------------+

// Combining these two

scala> val ansDF = twoDF.select("data").filter(!col("names.name").contains("Market") || length(col("names.name")) === 0)
scala> ansDF.printSchema

// Schema same as input df
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- parent: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- name: string (nullable = true)

scala> ansDF.show(false)
+----------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------------------------------+
|[[20180218,WrappedArray([Market])], [20180219,WrappedArray([Client], [Market])], [20180220,WrappedArray([Client])], [20180221,WrappedArray()]]|
|[[20180218,WrappedArray([Market])], [20180219,WrappedArray([Client], [Market])], [20180220,WrappedArray([Client])], [20180221,WrappedArray()]]|
+----------------------------------------------------------------------------------------------------------------------------------------------+

The final ansDF has the filtered records that satisfy the condition name does not contain 'Market' or isEmpty.

PS : If I have missed the exact filter scenario, correct from the filter function in the above code

Hope this helps!

Upvotes: 5

Related Questions