Brad Hein
Brad Hein

Reputation: 11057

How to explode structs with pyspark explode()

How do I convert the following JSON into the relational rows that follow it? The part that I am stuck on is the fact that the pyspark explode() function throws an exception due to a type mismatch. I have not found a way to coerce the data into a suitable format so that I can create rows out of each object within the source key within the sample_json object.

JSON INPUT

sample_json = """
{
"dc_id": "dc-101",
"source": {
    "sensor-igauge": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo": {"lat":38.00, "long":97.00}                        
    },
    "sensor-ipad": {
      "id": 13,
      "ip": "67.185.72.1",
      "description": "Sensor ipad attached to carbon cylinders",
      "temp": 34,
      "c02_level": 1370,
      "geo": {"lat":47.41, "long":-122.00}
    },
    "sensor-inest": {
      "id": 8,
      "ip": "208.109.163.218",
      "description": "Sensor attached to the factory ceilings",
      "temp": 40,
      "c02_level": 1346,
      "geo": {"lat":33.61, "long":-111.89}
    },
    "sensor-istick": {
      "id": 5,
      "ip": "204.116.105.67",
      "description": "Sensor embedded in exhaust pipes in the ceilings",
      "temp": 40,
      "c02_level": 1574,
      "geo": {"lat":35.93, "long":-85.46}
    }
  }
}"""

DESIRED OUTPUT

dc_id    source_name    id    description
-------------------------------------------------------------------------------
dc-101   sensor-gauge   10    Sensor attached to the container ceilings
dc-101   sensor-ipad    13    Sensor ipad attached to carbon cylinders
dc-101   sensor-inest    8    Sensor attached to the factory ceilings
dc-101   sensor-istick   5    Sensor embedded in exhaust pipes in the ceilings

PYSPARK CODE

from pyspark.sql.functions import *
df_sample_data = spark.read.json(sc.parallelize([sample_json]))
df_expanded = df_sample_data.withColumn("one_source",explode_outer(col("source")))
display(df_expanded)

ERROR

AnalysisException: cannot resolve 'explode(source)' due to data type mismatch: input to function explode should be array or map type, not struct....

I put together this Databricks notebook to further demonstrate the challenge and clearly show the error. I will be able to use this notebook to test any recommendations provided herein.

Upvotes: 3

Views: 5322

Answers (1)

blackbishop
blackbishop

Reputation: 32700

You can't use explode for structs but you can get the column names in the struct source (with df.select("source.*").columns) and using list comprehension you create an array of the fields you want from each nested struct, then explode to get the desired result :

from pyspark.sql import functions as F

df1 = df.select(
    "dc_id",
    F.explode(
        F.array(*[
            F.struct(
                F.lit(s).alias("source_name"),
                F.col(f"source.{s}.id").alias("id"),
                F.col(f"source.{s}.description").alias("description")
            )
            for s in df.select("source.*").columns
        ])
    ).alias("sources")

).select("dc_id", "sources.*") 

df1.show(truncate=False)

#+------+-------------+---+------------------------------------------------+
#|dc_id |source_name  |id |description                                     |
#+------+-------------+---+------------------------------------------------+
#|dc-101|sensor-igauge|10 |Sensor attached to the container ceilings       |
#|dc-101|sensor-inest |8  |Sensor attached to the factory ceilings         |
#|dc-101|sensor-ipad  |13 |Sensor ipad attached to carbon cylinders        |
#|dc-101|sensor-istick|5  |Sensor embedded in exhaust pipes in the ceilings|
#+------+-------------+---+------------------------------------------------+

Upvotes: 4

Related Questions