Mark A
Mark A

Reputation: 2135

PySpark create dataframe with column type dictionary

I want to create a simple pyspark dataframe with 1 column that is JSON. I created the schema for the groups column and created 1 row.

schema = T.StructType([
        T.StructField(
            'groups', T.ArrayType(
                T.StructType([
                    T.StructField("types", T.ArrayType(T.StringType(), False)),
                    T.StructField("label", T.StringType())
   
                ]),

            )
        )
])

groups_rows = [{
    "groups": [
        {
            "types": ["baseball", "basketball"],
            "label": "Label 1"
        },
        {
            "types": ["football"],
            "label": "Label 2"
        }
    ]
}]


data = [groups_rows]

sections_df = spark.createDataFrame(data=data, schema=schema)

When I initialize the dataframe I get a type error:

TypeError: field groups: ArrayType(StructType(List(StructField(types,ArrayType(StringType,false),true),StructField(label,StringType,true))),true) can not accept object {'groups': [{'types': ['baseball', 'basketball'], 'label': 'Label 1'}, {'types': ['football'], 'label': 'Label 2'}]} in type <class 'dict'>

What is the cause of this error? What should I be doing differently in terms of setting up this dataframe? Should I use a MapType?

Update:

I need that JSON list of the groups so it cant be separate rows. Every row needs that groups JSON list. What about converting it to a JSON string, making the groups col via withColumn("groups", groups_json_str) and doing the conversion with from_json? I tried doing it this but I was getting schema errors ..


groups =  [
    {
        "recTypes": ["readinghistory", "popular"],
        "sectionLabel": "Reader favorites you missed"
    },
    {
        "recTypes": ["contentpacks"],
        "sectionLabel": "Based on your interests"
    }
]


groups_json = json.dumps(groups)

groups_schema=  T.ArrayType(
                T.MapType([
                    T.StructField("types", T.ArrayType(T.StringType(), False)),
                    T.StructField("label", T.StringType())
   
                ])
)
df.withColumn("groups", F.lit(groups_json)
.withColumn("groups", F.to_json(F.col("sections"), groups_schema))

Error:

AttributeError: 'ArrayType' object has no attribute 'items'

Upvotes: -1

Views: 1002

Answers (2)

Zafar
Zafar

Reputation: 2016

This worked for me:

from pyspark.sql import types as T

schema = T.StructType([
  T.StructField('groups', 
                T.ArrayType(
                  T.StructType([
                    T.StructField('label', T.StringType(), True), 
                    T.StructField('types', 
                                  T.ArrayType(T.StringType(), True),
                                  True)
                  ]), True), 
                True)
])

groups_rows = [{
    "groups": [
        {
            "types": ["baseball", "basketball"],
            "label": "Label 1"
        },
        {
            "types": ["football"],
            "label": "Label 2"
        }
    ]
}]


data = groups_rows

sections_df = spark.createDataFrame(data=data, schema=schema)

It looks like grouping into [group_rows] was causing an issue. Why were you doing that?

UPDATE

After pondering this I think I know what you are looking for. Technically it can't be done the way you want, but I can get you close enough. Run this code:

schema = T.StructType([
  T.StructField('groups', 
                T.ArrayType(
                  T.MapType(T.StringType(), 
                            T.ArrayType(T.StringType(), True)
                  ), True), 
                True)
])

groups_rows = [{
    "groups": [
        {
            "types": ["baseball", "basketball"],
            "label": ["Label 1"]
        },
        {
            "types": ["football"],
            "label": ["Label 2"]
        }
    ]
}]

sections_df = spark.createDataFrame(data=groups_rows, schema=schema)
sections_df.show()

So the consideration here is that for Spark, your Map will need to have consistent typing for values and keys. I don't think there's a way to mix String and Array[String] in PySpark (have done it in Scala). To work around this I just made all values Array[String]. Note, Spark is written in Scala which is strongly typed. There is no encoder for Map[String, Any] in Spark.

Upvotes: 1

wwnde
wwnde

Reputation: 26676

This is a json . Python has set methods to read it. Doing schema may not be necessary. Have you tried?

df = spark.read.json(sc.parallelize(groups_rows)).na.fill('') 
df.printSchema()

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- label: string (nullable = true)
 |    |    |-- types: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

Can proceed and select as required

df.selectExpr('inline(groups)').show(truncate=False)

+-------+----------------------+
|label  |types                 |
+-------+----------------------+
|Label 1|[baseball, basketball]|
|Label 2|[football]            |
+-------+----------------------+

Alternatively, write the json to a file and read it using the Databricks utilities. Code below

dbutils.fs.put("/tmp/groupd.json", """
{
    "groups": [
        {
            "types": ["baseball", "basketball"],
            "label": "Label 1"
        },
        {
            "types": ["football"],
            "label": "Label 2"
        }
    ]
}
""", True)

#j_nested=spark.read.json('/tmp/test.json')
spark.read.option('multiline',True).option('mode','permissive').json('/tmp/groupd.json').show()

Upvotes: 0

Related Questions