agreüs
agreüs

Reputation: 109

pyspark: turn array of dict to new columns

I am struggling to transform my pyspark dataframe which looks like this:

df = spark.createDataFrame([('0018aad4',[300, 450], ['{"v1": "blue"}', '{"v2": "red"}']), ('0018aad5',[300], ['{"v1": "blue"}'])],[ "id","Tlist", 'Tstring'])
df.show(2, False)

+--------+----------+-------------------------------+
|id      |Tlist     |Tstring                        |
+--------+----------+-------------------------------+
|0018aad4|[300, 450]|[{"v1": "blue"}, {"v2": "red"}]|
|0018aad5|[300]     |[{"v1": "blue"}]               |
+--------+----------+-------------------------------+

to this:

df_result = spark.createDataFrame([('0018aad4',[300, 450], 'blue', 'red'), ('0018aad5',[300], 'blue', None)],[ "id","Tlist", 'v1', 'v2'])
df_result.show(2, False)

+--------+----------+----+----+
|id      |Tlist     |v1  |v2  |
+--------+----------+----+----+
|0018aad4|[300, 450]|blue|red |
|0018aad5|[300]     |blue|null|
+--------+----------+----+----+

I tried to pivot and a bunch of others things but don't get the result above.

Note that I don't have the exact number of dict in the column Tstring

Do you know how I can do this?

Upvotes: 2

Views: 645

Answers (3)

blackbishop
blackbishop

Reputation: 32640

Using transform function you can convert each element of the array into a map type. After that, you can use aggregate function to get one map, explode it then pivot the keys to get the desired output:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "Tstring",
    F.transform("Tstring", lambda x: F.from_json(x, "map<string,string>"))
).withColumn(
    "Tstring",
    F.aggregate(
        F.expr("slice(Tstring, 2, size(Tstring))"), 
        F.col("Tstring")[0], 
        lambda acc, x: F.map_concat(acc, x)
    )
).select(
    "id", "Tlist", F.explode("Tstring")
).groupby(
    "id", "Tlist"
).pivot("key").agg(F.first("value"))


df1.show()
#+--------+----------+----+----+
#|id      |Tlist     |v1  |v2  |
#+--------+----------+----+----+
#|0018aad4|[300, 450]|blue|red |
#|0018aad5|[300]     |blue|null|
#+--------+----------+----+----+

I'm using Spark 3.1+, so the higher-order functions such as transform are available in dataframe API but you can do the same using expr for spark <3.1:

df1 = (df.withColumn("Tstring", F.expr("transform(Tstring, x-> from_json(x, 'map<string,string>'))"))
       .withColumn("Tstring", F.expr("aggregate(slice(Tstring, 2, size(Tstring)), Tstring[0], (acc, x) -> map_concat(acc, x))"))
       .select("id", "Tlist", F.explode("Tstring"))
       .groupby("id", "Tlist")
       .pivot("key")
       .agg(F.first("value"))
       )

Upvotes: 1

OooO V5
OooO V5

Reputation: 11

Slightly over-fitting the example (You might need to tweak it for any generalization), you can get the elements from the Tstring column using their index:

partial_results = df.withColumn("v1", df.Tstring[0]).withColumn("v2", df.Tstring[1])

+--------+----------+--------------+-------------+
|      id|     Tlist|            v1|           v2|
+--------+----------+--------------+-------------+
|0018aad4|[300, 450]|{"v1": "blue"}|{"v2": "red"}|
|0018aad5|     [300]|{"v1": "blue"}|         null|
+--------+----------+--------------+-------------+

Having this you can do some cleaning to achieve the wanted result

from pyspark.sql.functions import regexp_replace


maximum_color_length = 100
wanted_df = df.withColumn(
    "v1",
    regexp_replace(df.Tstring[0].substr(9, maximum_color_length), r"\"\}", "")
).withColumn(
    "v2",
    regexp_replace(df.Tstring[1].substr(9, maximum_color_length), r"\"\}", "")
).drop(
    "Tstring"
)

+--------+----------+----+----+
|      id|     Tlist|  v1|  v2|
+--------+----------+----+----+
|0018aad4|[300, 450]|blue| red|
|0018aad5|     [300]|blue|null|
+--------+----------+----+----+

Upvotes: 1

DontDownvote
DontDownvote

Reputation: 192

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

spark = SparkSession(sc)

df= spark.createDataFrame(
    [
        ('0018aad4', [{"val1":"blue", "val2":"red"}],[300,500]), 
         ('0018aad', [{"val1":"blue", "val2":"null"}],[300])
       
        ],("ID","List","Tlist")
    )

df2 = df.select(df.ID,explode(df.List).alias("Dict"),df.Tlist )
df2.withColumn("Val1", F.col("Dict").getItem("val1")).withColumn("Val2", F.col("Dict").getItem("val2")).show(truncate=False)


+--------+----------------------------+----------+----+----+
|ID      |Dict                        |Tlist     |Val1|Val2|
+--------+----------------------------+----------+----+----+
|0018aad4|{val2 -> red, val1 -> blue} |[300, 500]|blue|red |
|0018aad |{val2 -> null, val1 -> blue}|[300]     |blue|null|

+--------+----------------------------+----------+----+----+

this is what you are looking for.

Upvotes: 0

Related Questions