Reputation: 3672
The data looks like this -
+-----------+-----------+-----------------------------+
| id| point| data|
+-----------------------------------------------------+
| abc| 6|{"key1":"124", "key2": "345"}|
| dfl| 7|{"key1":"777", "key2": "888"}|
| 4bd| 6|{"key1":"111", "key2": "788"}|
I am trying to break it into the following format.
+-----------+-----------+-----------+-----------+
| id| point| key1| key2|
+------------------------------------------------
| abc| 6| 124| 345|
| dfl| 7| 777| 888|
| 4bd| 6| 111| 788|
The explode
function explodes the dataframe into multiple rows. But that is not the desired solution.
Note: This solution does not answers my questions. PySpark "explode" dict in column
Upvotes: 46
Views: 84348
Reputation: 26
You can use from_json function also to achieve this,
data = [("abc", 6, '{"key1":"124", "key2": "345"}'),
("dfl", 7, '{"key1":"777", "key2": "888"}'),
("4bd", 6, '{"key1":"111", "key2": "788"}')]
df = spark.createDataFrame(data, ["id", "point", "data"])
schema = "key1 string, key2 string"
df = df.withColumn("data", from_json(col("data"), schema)) \
.select("id", "point", "data.key1", "data.key2")
df.show()
Upvotes: 0
Reputation: 170
Simply do this:
df.select("id", "point", "data.*").show()
It will give you following answer:
Explanation:
To expand a struct type data, 'data.*' can be used. Doing this will expand the data column and the 'key' inside data column will become new columns.
Upvotes: 0
Reputation: 155
In this approach you just need to set the name of column with Json content. No need to set up the schema. It makes everything automatically.
json_col_name = 'data'
keys = df.select(f"{json_col_name}.*").columns
jsonFields= [f"{json_col_name}.{key} {key}" for key in keys]
main_fields = [key for key in df.columns if key != json_col_name]
df_new = df.selectExpr(main_fields + jsonFields)
Upvotes: 3
Reputation: 758
As mentioned by @jxc, json_tuple
should work fine if you were not able to define the schema beforehand and you only needed to deal with a single level of json string. I think it's more straight forward and easier to use. Strangely, I didn't find anyone else mention this function before.
In my use case, original dataframe schema: StructType(List(StructField(a,StringType,true)))
, json string column shown as:
+---------------------------------------+
|a |
+---------------------------------------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
|{"k1": "v11", "k3": "v33"} |
|{"k1": "v13", "k2": "23"} |
+---------------------------------------+
Expand json fields into new columns with json_tuple
:
from pyspark.sql import functions as F
df = df.select(F.col('a'),
F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
.alias('k1', 'k2', 'k3'))
df.schema
df.show(truncate=False)
The document doesn't say much about it, but at least in my use case, new columns extracted by json_tuple
are StringType
, and it only extract single depth of JSON string.
StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))
+---------------------------------------+---+----+-------+
|a |k1 |k2 |k3 |
+---------------------------------------+---+----+-------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2 |{"m":1}|
|{"k1": "v11", "k3": "v33"} |v11|null|v33 |
|{"k1": "v13", "k2": "23"} |v13|23 |null |
+---------------------------------------+---+----+-------+
Upvotes: 3
Reputation: 1383
All credits to Shrikant Prabhu
You can simply use SQL
SELECT id, point, data.*
FROM original_table
Like this the schema of the new table will adapt if the data changes and you won't have to do anything in your pipelin.
Upvotes: 0
Reputation: 551
This works for my use case
data1 = spark.read.parquet(path)
json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
data2 = data1.withColumn("data", from_json("json_col", json_schema))
col1 = data2.columns
col1.remove("data")
col2 = data2.select("data.*").columns
append_str ="data."
col3 = [append_str + val for val in col2]
col_list = col1 + col3
data3 = data2.select(*col_list).drop("json_col")
Upvotes: 0
Reputation: 13998
As suggested by @pault, the data field is a string
field. since the keys are the same (i.e. 'key1', 'key2') in the JSON string over rows, you might also use json_tuple()
(this function is New in version 1.6 based on the documentation)
from pyspark.sql import functions as F
df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()
Below is My original post: which is most likely WRONG if the original table is from df.show(truncate=False)
and thus the data
field is NOT a python data structure.
Since you have exploded the data into rows, I supposed the column data
is a Python data structure instead of a string:
from pyspark.sql import functions as F
df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()
Upvotes: 8
Reputation: 41987
As long as you are using Spark version 2.1 or higher, pyspark.sql.functions.from_json
should get you your desired result, but you would need to first define the required schema
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('key1', StringType(), True),
StructField('key2', StringType(), True)
]
)
df.withColumn("data", from_json("data", schema))\
.select(col('id'), col('point'), col('data.*'))\
.show()
which should give you
+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc| 6| 124| 345|
|df1| 7| 777| 888|
|4bd| 6| 111| 788|
+---+-----+----+----+
Upvotes: 61