Buzz Moschetti
Buzz Moschetti

Reputation: 7621

SPARK processing of polymorphic JSON

Consider this JSON input (shown in multiline form for readability but the actual input docs are one-line CR-delimited):

{
  "common": { "type":"A", "date":"2020-01-01T12:00:00" },
  "data": {
    "name":"Dave",
    "pets": [ "dog", "cat" ]
  }
}
{
  "common": { "type": "B", "date":"2020-01-01T12:00:00" },
  "data": {
    "whatever": { "X": {"foo":3}, "Y":"bar" },
    "favoriteInts": [ 0, 1, 7]
  }
}

I am familiar with json-schema and the way I can describe that the data substructure can be either name,pets OR whatever,favoriteInts. We use the common.type field to runtime identify the type.

Is this possible in SPARK schema definition? Initial experiments along the lines of:

    schema = StructType([
        StructField("common", StructType(common_schema)), # .. because the type is consistent                                       
        StructField("data", StructType())  # attempting to declare a "generic" struct
    ])
    df = spark.read.option("multiline", "true").json(source, schema)

does not work; upon read where the data struct contains, well, anything but in this particular example 2 fields, we get:

+--------------------+----+                                                     
|              common|data|
+--------------------+----+
|{2020-01-01T12:00...|  {}|
+--------------------+----+

and trying to extract any named field yields No such struct field <whatever>. Leaving the "generic struct" out of the schema def entirely yields a dataframe without any field named data, never mind the fields within.

Beyond this, I ultimately seek to do something like this:

df = spark.read.json(source)

def processA(frame):
    frame.select( frame.data.name )  # we KNOW name exists for type A
    ...

def processB(frame):
    frame.select( frame.data.favoriteInts )  # we KNOW favoriteInts exists for type B
    ...

processA( df.filter(df.common.type == "A") )
processB( df.filter(df.common.type == "B") )

Upvotes: 1

Views: 238

Answers (1)

ggordon
ggordon

Reputation: 10035

You may use nested and nullable types (by specifying True) in the struct to accommodate for the uncertainty.

from pyspark.sql.types import StructType, StringType, ArrayType, StructField, IntegerType

data_schema = StructType([
    # Type A related attributes
    StructField("name",StringType(),True), # True implies nullable
    StructField("pets",ArrayType(StringType()),True),

   # Type B related attributes
    StructField("whatever",StructType([
        StructField("X",StructType([
            StructField("foo",IntegerType(),True)
        ]),True),
        StructField("Y",StringType(),True)
    ]),True), # True implies nullable
    StructField("favoriteInts",ArrayType(IntegerType()),True),
])
schema = StructType([
        StructField("common", StructType(common_schema)), # .. because the type is consistent                                       
        StructField("data", data_schema)  
])

Upvotes: 2

Related Questions