Reputation: 51
I am working with PySpark. I have a DataFrame
loaded from csv that contains the following schema:
root
|-- id: string (nullable = true)
|-- date: date (nullable = true)
|-- users: string (nullable = true)
If I show the first two rows it looks like:
+---+----------+---------------------------------------------------+
| id| date|users |
+---+----------+---------------------------------------------------+
| 1|2017-12-03|{"1":["xxx","yyy","zzz"],"2":["aaa","bbb"],"3":[]} |
| 2|2017-12-04|{"1":["uuu","yyy","zzz"],"2":["aaa"],"3":[]} |
+---+----------+---------------------------------------------------+
I would like to create a new DataFrame
that contains the 'user' string broken out by each element. I would like something similar to
id user_id user_product
1 1 xxx
1 1 yyy
1 1 zzz
1 2 aaa
1 2 bbb
1 3 <null>
2 1 uuu
etc...
I have tried many approaches but can't seem to get it working.
The closest I can get is defining the schema such as the following and creating a new df applying schema using from_json
:
userSchema = StructType([
StructField("user_id", StringType()),
StructField("product_list", StructType([
StructField("product", StringType())
]))
])
user_df = in_csv.select('id',from_json(in_csv.users, userSchema).alias("test"))
This returns the correct schema:
root
|-- id: string (nullable = true)
|-- test: struct (nullable = true)
| |-- user_id: string (nullable = true)
| |-- product_list: struct (nullable = true)
| | |-- product: string (nullable = true)
but when I show any part of the 'test' struct
it returns nulls
instead of values e.g.
user_df.select('test.user_id').show()
returns test.user_id :
+-------+
|user_id|
+-------+
| null|
| null|
+-------+
Maybe I shouldn't be using the from_json
as the users string is not pure JSON. Any help as to approach I could take?
Upvotes: 1
Views: 1288
Reputation: 1007
The schema should conform to the shape of the data. Unfortunately from_json
supports only StructType(...)
or ArrayType(StructType(...))
which won't be useful here, unless you can guarantee that all records have the same set of key.
Instead, you can use an UserDefinedFunction
:
import json
from pyspark.sql.functions import explode, udf
df = spark.createDataFrame([
(1, "2017-12-03", """{"1":["xxx","yyy","zzz"],"2":["aaa","bbb"],"3":[]}"""),
(2, "2017-12-04", """{"1":["uuu","yyy","zzz"],"2":["aaa"],"3":[]}""")],
("id", "date", "users")
)
@udf("map<string, array<string>>")
def parse(s):
try:
return json.loads(s)
except:
pass
(df
.select("id", "date",
explode(parse("users")).alias("user_id", "user_product"))
.withColumn("user_product", explode("user_product"))
.show())
# +---+----------+-------+------------+
# | id| date|user_id|user_product|
# +---+----------+-------+------------+
# | 1|2017-12-03| 1| xxx|
# | 1|2017-12-03| 1| yyy|
# | 1|2017-12-03| 1| zzz|
# | 1|2017-12-03| 2| aaa|
# | 1|2017-12-03| 2| bbb|
# | 2|2017-12-04| 1| uuu|
# | 2|2017-12-04| 1| yyy|
# | 2|2017-12-04| 1| zzz|
# | 2|2017-12-04| 2| aaa|
# +---+----------+-------+------------+
Upvotes: 1
Reputation: 2545
You dont need to use from_json
. You have to explode
two times, one for user_id
and one for users
.
import pyspark.sql.functions as F
df = sql.createDataFrame([
(1,'2017-12-03',{"1":["xxx","yyy","zzz"],"2":["aaa","bbb"],"3":[]} ),
(2,'2017-12-04',{"1":["uuu","yyy","zzz"],"2":["aaa"], "3":[]} )],
['id','date','users']
)
df = df.select('id','date',F.explode('users').alias('user_id','users'))\
.select('id','date','user_id',F.explode('users').alias('users'))
df.show()
+---+----------+-------+-----+
| id| date|user_id|users|
+---+----------+-------+-----+
| 1|2017-12-03| 1| xxx|
| 1|2017-12-03| 1| yyy|
| 1|2017-12-03| 1| zzz|
| 1|2017-12-03| 2| aaa|
| 1|2017-12-03| 2| bbb|
| 2|2017-12-04| 1| uuu|
| 2|2017-12-04| 1| yyy|
| 2|2017-12-04| 1| zzz|
| 2|2017-12-04| 2| aaa|
+---+----------+-------+-----+
Upvotes: 0