sparknoob
sparknoob

Reputation: 1286

Spark Cast StructType / JSON to String

I have a new-line delimited json file that looks like

{"id":1,"nested_col": {"key1": "val1", "key2": "val2", "key3": ["arr1", "arr2"]}}
{"id":2,"nested_col": {"key1": "val1_2", "key2": "val2_2", "key3": ["arr1_2", "arr2"]}}

Once I read the file using df = spark.read.json(path_to_file), I end up with a dataframe whose schema looks like:

DataFrame[id: bigint,nested_col:struct<key1:string,key2:string,key3:array<string>>]

What I want to do is cast nested_col to see it a string without setting primitivesAsString to true (since I actually have 100+ columns and need the types of all my other columns to be inferred). I also don't know what nested_col looks like before hand. In other words, I'd like my DataFrame to look like

DataFrame[id: bigint,nested_col:string]

I tried to do

df.select(df['nested_col'].cast('string')).take(1)`

but it doesn't return the correct string representation of the JSON:

[Row(nested_col=u'[0,2000000004,2800000004,3000000014,316c6176,326c6176,c00000002,3172726100000010,32727261]')]`

whereas I was hoping for:

[Row(nested_col=u'{"key1": "val1", "key2": "val2", "key3": ["arr1", "arr2"]}')]

Does anyone know how I can get the desired result (aka cast a nested JSON field / StructType to a String)?

Upvotes: 4

Views: 9274

Answers (1)

zero323
zero323

Reputation: 330063

To be honest parsing JSON and inferring schema just to push everything back to JSON sounds a bit strange but here you are:

  • Required imports:

    from pyspark.sql import types
    from pyspark.sql.functions import to_json, concat_ws, concat, struct
    
  • A helper function:

    def jsonify(df):
        def convert(f):
            if isinstance(f.dataType, types.StructType):
                return to_json(f.name).alias(f.name)
            if isinstance(f.dataType, types.ArrayType):
                return get_json_object(
                    to_json(struct(f.name)), 
                    "$.{0}".format(f.name)
                ).alias(f.name)
            return f.name
    
        return df.select([convert(f) for f in df.schema.fields])
    
  • Example usage:

    df = sc.parallelize([("a", 1, (2, 3), ["1", "2", "3"])]).toDF()
    
    jsonify(df).show()
    
    +---+---+---------------+-------------+
    | _1| _2|             _3|           _4|
    +---+---+---------------+-------------+
    |  a|  1|{"_1":2,"_2":3}|["1","2","3"]|
    +---+---+---------------+-------------+
    

Upvotes: 5

Related Questions