Reputation: 31
My input DataFrame schema is like below. The difference between elements 1 and 2 in d is 1 has attributes a,b,c,d and 2 has only a,b,c
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- c: string (nullable = true)
|-- d: struct (nullable = true)
| |-- 1: struct (nullable = true)
| | |-- a: string (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
| | |-- d: double (nullable = true)
| |-- 2: struct (nullable = true)
| | |-- a: string (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
I am trying explode the elements of d using below code
df2 = inputDF.withColumn("d1",f.explode(f.array("d.*").getField("c")))
and getting error pyspark.sql.utils.AnalysisException: cannot resolve 'array(d
.1
, d
.2
)' due to data type mismatch: input to function array should all be the same type, but it's [struct<a:string,b:string,c:string,d:double>, struct<a:string,b:string,c:string>];
'Project [a#832, b#833, c#834, d#835, explode(array(d#835.1, d#835.2)[c]) AS d1#843]
+- Relation[a#832,b#833,c#834,d#835] json
Is there any way to instruct the function to assume NULLS when missing columns in input to array function?
Upvotes: 0
Views: 466
Reputation: 892
You can explode array of struct where one of the element missing a field as in you case by following:
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, ArrayType, StructField, StringType
spark = SparkSession \
.builder \
.appName("SparkTesting") \
.getOrCreate()
d_schema = ArrayType(StructType([
StructField('a', StringType(), nullable=True),
StructField('b', StringType(), nullable=True),
StructField('c', StringType(), nullable=True),
StructField('d', StringType(), nullable=True),
]))
df_schema = (StructType()
.add("a", StringType(), nullable=True)
.add("b", StringType(), nullable=True)
.add("c", StringType(), nullable=True)
.add("d", d_schema, nullable=True))
item1 = {
"a": "a1",
"b": "b1",
"c": "c1",
"d": [
{
"a": "a1",
"b": "b1",
"c": "c1",
"d": "d1"
},
{
"a": "a1",
"b": "b1",
"c": "c1",
}
],
}
df = spark.createDataFrame([item1], schema=df_schema)
df.printSchema()
df.show(truncate=False)
df2 = df.withColumn("d1", f.explode(col("d")))
df2.printSchema()
df2.show(truncate=False)
df2.select("d1.c").show()
+---+---+---+--------------------------------------+------------------+
|a |b |c |d |d1 |
+---+---+---+--------------------------------------+------------------+
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, d1} |
|a1 |b1 |c1 |[{a1, b1, c1, d1}, {a1, b1, c1, null}]|{a1, b1, c1, null}|
+---+---+---+--------------------------------------+------------------+
In case you are not sure whether the array field d
itself will be null then its advisable to use explode_outer()
function instead of explode()
.
As per the comment to match the schema: below code will work:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
d_inter_schema = (StructType([
StructField('a', StringType(), nullable=True),
StructField('b', StringType(), nullable=True),
StructField('c', StringType(), nullable=True),
StructField('d', StringType(), nullable=True),
]))
d_schema = StructType().add("1", d_inter_schema, nullable=True).add("2", d_inter_schema, nullable=True)
df_schema = (StructType()
.add("a", StringType(), nullable=True)
.add("b", StringType(), nullable=True)
.add("c", StringType(), nullable=True)
.add("d", d_schema, nullable=True))
item1 = {
"a": "a1",
"b": "b1",
"c": "c1",
"d": {"1": {
"a": "a1",
"b": "b1",
"c": "c1",
"d": "d1"
},
"2": {
"a": "a1",
"b": "b1",
"c": "c1",
}
},
}
df = spark.createDataFrame([item1], schema=df_schema)
df.printSchema()
df.show(truncate=False)
+---+---+---+--------------------------------------+
|a |b |c |d |
+---+---+---+--------------------------------------+
|a1 |b1 |c1 |{{a1, b1, c1, d1}, {a1, b1, c1, null}}|
+---+---+---+--------------------------------------+
df.select("d.1.c", "d.2.c").show()
+---+---+
| c| c|
+---+---+
| c1| c1|
+---+---+
Upvotes: 1