Teresa
Teresa

Reputation: 77

extract value from a list of json in pyspark

I have a dataframe where a column is in the form of a list of json. I want to extract a specific value (score) from the column and create independent columns.

raw_data = [{"user_id" : 1234, "col" : [{"id":14577120145280,"score":64.71,"Elastic_position":0},{"id":14568530280240,"score":88.53,"Elastic_position":1},{"id":14568530119661,"score":63.75,"Elastic_position":2},{"id":14568530205858,"score":62.79,"Elastic_position":3},{"id":14568530414899,"score":60.88,"Elastic_position":4}]}]

df = pd.DataFrame.from_dict(raw_data)

I want to explode my result dataframe as:

enter image description here

Upvotes: 1

Views: 2358

Answers (2)

pltc
pltc

Reputation: 6082

Assuming you have your json looks like this

# a.json
# {
#     "user_id" : 1234,
#     "col" : [
#         {"id":14577120145280,"score":64.71,"Elastic_position":0},
#         {"id":14568530280240,"score":88.53,"Elastic_position":1},
#         {"id":14568530119661,"score":63.75,"Elastic_position":2},
#         {"id":14568530205858,"score":62.79,"Elastic_position":3},
#         {"id":14568530414899,"score":60.88,"Elastic_position":4}
#     ]
# }

You can read it, flatten it, then pivot it like so

from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.StructType([
    T.StructField('user_id', T.IntegerType()),
    T.StructField('col', T.StringType()),
])

df = spark.read.json('a.json', multiLine=True, schema=schema)
df.show(10, False)

# +-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |user_id|col                                                                                                                                                                                                                                                                                           |
# +-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |1234   |[{"id":14577120145280,"score":64.71,"Elastic_position":0},{"id":14568530280240,"score":88.53,"Elastic_position":1},{"id":14568530119661,"score":63.75,"Elastic_position":2},{"id":14568530205858,"score":62.79,"Elastic_position":3},{"id":14568530414899,"score":60.88,"Elastic_position":4}]|
# +-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


df.printSchema()
# root
#  |-- user_id: integer (nullable = true)
#  |-- col: string (nullable = true)

(df
    # this will parse your JSON string to JSON object
    .withColumn('col', F.from_json(
        F.col('col'),
        T.ArrayType(T.StructType([
            T.StructField('id', T.LongType()),
            T.StructField('score', T.DoubleType()),
            T.StructField('Elastic_position', T.IntegerType()),
        ]))
    ))
 
    .select('user_id', F.explode('col'))
    .groupBy('user_id')
    .pivot('col.Elastic_position')
    .agg(F.first('col.score'))
    .show(10, False)
)

# Output
# +-------+-----+-----+-----+-----+-----+
# |user_id|0    |1    |2    |3    |4    |
# +-------+-----+-----+-----+-----+-----+
# |1234   |64.71|88.53|63.75|62.79|60.88|
# +-------+-----+-----+-----+-----+-----+

Upvotes: 2

U13-Forward
U13-Forward

Reputation: 71620

Try using pd.Series.explode with groupby:

df = pd.DataFrame.from_dict(raw_data).explode('col')
df.assign(col=df['col'].str['score']).groupby('user_id').agg(list).apply(lambda x: (y:=x.explode()).set_axis(y.index + '_' + y.groupby(level=0).cumcount().astype(str)), axis=1).reset_index()

   user_id  col_0  col_1  col_2  col_3  col_4
0     1234  64.71  88.53  63.75  62.79  60.88

If firstly constructs a dataframe and explode the col column, then groups by the duplicated user_ids and performs another explode to make it long to wide, then adds the prefix 0 to 4 with cumcount.

Upvotes: 0

Related Questions