Ram
Ram

Reputation: 31

pyspark: Is it possible to create array with missing elements in one struct

My input DataFrame schema is like below. The difference between elements 1 and 2 in d is 1 has attributes a,b,c,d and 2 has only a,b,c

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)
 |    |    |-- d: double (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: string (nullable = true)

I am trying explode the elements of d using below code

df2 = inputDF.withColumn("d1",f.explode(f.array("d.*").getField("c")))

and getting error pyspark.sql.utils.AnalysisException: cannot resolve 'array(d.1, d.2)' due to data type mismatch: input to function array should all be the same type, but it's [struct<a:string,b:string,c:string,d:double>, struct<a:string,b:string,c:string>]; 'Project [a#832, b#833, c#834, d#835, explode(array(d#835.1, d#835.2)[c]) AS d1#843] +- Relation[a#832,b#833,c#834,d#835] json

Is there any way to instruct the function to assume NULLS when missing columns in input to array function?

Upvotes: 0

Views: 466

Answers (1)

Felix K Jose
Felix K Jose

Reputation: 892

You can explode array of struct where one of the element missing a field as in you case by following:

import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType, StructField, StringType

spark = SparkSession \
    .builder \
    .appName("SparkTesting") \
    .getOrCreate()

d_schema = ArrayType(StructType([
    StructField('a', StringType(), nullable=True),
    StructField('b', StringType(), nullable=True),
    StructField('c', StringType(), nullable=True),
    StructField('d', StringType(), nullable=True),
]))
df_schema = (StructType()
             .add("a", StringType(), nullable=True)
             .add("b", StringType(), nullable=True)
             .add("c", StringType(), nullable=True)
             .add("d", d_schema, nullable=True))

item1 = {
    "a": "a1",
    "b": "b1",
    "c": "c1",
    "d": [
        {
            "a": "a1",
            "b": "b1",
            "c": "c1",
            "d": "d1"
        },
        {
            "a": "a1",
            "b": "b1",
            "c": "c1",
        }
    ],
}

df = spark.createDataFrame([item1], schema=df_schema)

df.printSchema()
df.show(truncate=False)

df2 = df.withColumn("d1", f.explode(col("d")))
df2.printSchema()
df2.show(truncate=False)
df2.select("d1.c").show()

+---+---+---+--------------------------------------+------------------+
|a  |b  |c  |d                                     |d1                |
+---+---+---+--------------------------------------+------------------+
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, d1}  |
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, null}|
+---+---+---+--------------------------------------+------------------+

In case you are not sure whether the array field d itself will be null then its advisable to use explode_outer() function instead of explode().

As per the comment to match the schema: below code will work:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession \
    .builder \
    .appName("StructuredStreamTesting") \
    .getOrCreate()
d_inter_schema = (StructType([
    StructField('a', StringType(), nullable=True),
    StructField('b', StringType(), nullable=True),
    StructField('c', StringType(), nullable=True),
    StructField('d', StringType(), nullable=True),
]))
d_schema = StructType().add("1", d_inter_schema, nullable=True).add("2", d_inter_schema, nullable=True)

df_schema = (StructType()
             .add("a", StringType(), nullable=True)
             .add("b", StringType(), nullable=True)
             .add("c", StringType(), nullable=True)
             .add("d", d_schema, nullable=True))

item1 = {
    "a": "a1",
    "b": "b1",
    "c": "c1",
    "d": {"1": {
        "a": "a1",
        "b": "b1",
        "c": "c1",
        "d": "d1"
    },
        "2": {
            "a": "a1",
            "b": "b1",
            "c": "c1",
        }
    },
}

df = spark.createDataFrame([item1], schema=df_schema)

df.printSchema()
df.show(truncate=False)

+---+---+---+--------------------------------------+
|a  |b  |c  |d                                     |
+---+---+---+--------------------------------------+
|a1 |b1 |c1 |{{a1, b1, c1, d1}, {a1, b1, c1, null}}|
+---+---+---+--------------------------------------+
df.select("d.1.c", "d.2.c").show()
+---+---+
|  c|  c|
+---+---+
| c1| c1|
+---+---+

Upvotes: 1

Related Questions