Reputation: 205
I have a spark dataframe which I need to write to MongoDB. I wanted to know how can I write some of the columns of the dataframe as nested/hierarchical JSON in mongoDB. Lets say the dataframe has 6 columns, col1, col2,.....col5, col6 I would want col1, col2, col3 as 1st hierarchy and the rest columns col4 to col6 as the 2nd hierarchy. something like this,
{
"col1": 123,
"col2": "abc",
"col3": 45,
"fields": {
"col4": "ert",
"col5": 45,
"col6": 56
}
}
how do I achieve this in pyspark?
Upvotes: 2
Views: 619
Reputation: 31540
Use to_json + struct
in built function for this case.
Example:
df.show()
#+----+----+----+----+----+----+
#|col1|col2|col3|col4|col5|col6|
#+----+----+----+----+----+----+
#| 123| abc| 45| ert| 45| 56|
#+----+----+----+----+----+----+
from pyspark.sql.functions import *
df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).show(10,False)
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
#|col1|col2|col3|col4|col5|col6|jsn |
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
#|123 |abc |45 |ert |45 |56 |{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
cols=df.columns
df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).drop(*cols).show(10,False)
#+---------------------------------------------------------------------------------------+
#|jsn |
#+---------------------------------------------------------------------------------------+
#|{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
#+---------------------------------------------------------------------------------------+
#using toJSON
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).toJSON().collect()
#[u'{"jsn":{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}}']
#to write as json file
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).\
drop(*cols).\
write.\
format("json").\
save("<path>")
Update:
jsn
column Represented as json struct
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).printSchema()
#root
# |-- jsn: struct (nullable = false)
# | |-- col1: string (nullable = true)
# | |-- col2: string (nullable = true)
# | |-- col3: string (nullable = true)
# | |-- fields: struct (nullable = false)
# | | |-- col4: string (nullable = true)
# | | |-- col5: string (nullable = true)
# | | |-- col6: string (nullable = true)
Upvotes: 1