RichardKang
RichardKang

Reputation: 555

PySpark UDF of MapType with mixed value type

I have an JSON input like this


    {
      "1": {
        "id": 1,
        "value": 5
      },
      "2": {
        "id": 2,
        "list": {
          "10": {
            "id": 10
          },
          "11": {
            "id": 11
          },
          "20": {
            "id": 20
          }
        }
      },
      "3": {
        "id": 3,
        "key": "a"
      }
    }

I need to merge the 3 columns and extract the needed values for each column, and this is the output I need:


    {
      "out": {
        "1": 5,
        "2": [10, 11, 20],
        "3": "a"
      }
    }

I tried to create a UDF to transform these 3 columns into 1, but I could not figure how to define MapType() with mixed value types - IntegerType(), ArrayType(IntegerType()) and StringType() respectively.

Thanks in advance!

Upvotes: 2

Views: 9913

Answers (2)

MaFF
MaFF

Reputation: 10086

MapType() is used for (key, value) pairs definitions not for nested data frames. What you're looking for is StructType()

You can load it directly using createDataFrame but you'd have to pass a schema, so this way is easier:

import json

data_json = {
      "1": {
        "id": 1,
        "value": 5
      },
      "2": {
        "id": 2,
        "list": {
          "10": {
            "id": 10
          },
          "11": {
            "id": 11
          },
          "20": {
            "id": 20
          }
        }
      },
      "3": {
        "id": 3,
        "key": "a"
      }
    }
a=[json.dumps(data_json)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)
df.printSchema()

    root
     |-- 1: struct (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- value: long (nullable = true)
     |-- 2: struct (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- list: struct (nullable = true)
     |    |    |-- 10: struct (nullable = true)
     |    |    |    |-- id: long (nullable = true)
     |    |    |-- 11: struct (nullable = true)
     |    |    |    |-- id: long (nullable = true)
     |    |    |-- 20: struct (nullable = true)
     |    |    |    |-- id: long (nullable = true)
     |-- 3: struct (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- key: string (nullable = true)

Now to access nested dataframes. Note that column "2" is more nested than the other ones:

nested_cols = ["2"]
cols = ["1", "3"]
import pyspark.sql.functions as psf
df = df.select(
    cols + [psf.array(psf.col(c + ".list.*")).alias(c) for c in nested_cols]
)
df = df.select(
    [df[c].id.alias(c) for c in df.columns]
)

    root
     |-- 1: long (nullable = true)
     |-- 3: long (nullable = true)
     |-- 2: array (nullable = false)
     |    |-- element: long (containsNull = true)

It's not exactly your final output since you want it nested in an "out" column:

import pyspark.sql.functions as psf
df.select(psf.struct("*").alias("out")).printSchema()

    root
     |-- out: struct (nullable = false)
     |    |-- 1: long (nullable = true)
     |    |-- 3: long (nullable = true)
     |    |-- 2: array (nullable = false)
     |    |    |-- element: long (containsNull = true)

Finally back to JSON:

df.toJSON().first()

    '{"1":1,"3":3,"2":[10,11,20]}'

Upvotes: 2

Mariusz
Mariusz

Reputation: 13926

You need to define resulting type of the UDF using StructType, not the MapType, like this:

from pyspark.sql.types import *

udf_result = StructType([
    StructField('1', IntegerType()),
    StructField('2', ArrayType(StringType())),
    StructField('3', StringType())
])

Upvotes: 4

Related Questions