bali89
bali89

Reputation: 13

Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe

Converted dataframe(say child dataframe) into json using df.toJSON

After json conversion the schema looks like this :

root
 |-- value: string (nullable = true)

I used the following suggestion to get child dataframe into the intermediate parent schema/dataframe:

scala> parentDF.toJSON.select(struct($"value").as("data")).printSchema
root
 |-- data: struct (nullable = false)
 |    |-- value: string (nullable = true)

Now I still need to build the parentDF schema further to make it look like:

root
 |-- id
 |-- version 
 |-- data: struct (nullable = false)
 |    |-- value: string (nullable = true)

Q1) How can I build the id column using the id from value(i.e value.id needs to be represented as id)

Q2) I need to bring version from a different dataframe(say versionDF) where version is a constant(in all columns). Do I fetch one row from this versionDF to read value of version column and then populate it as literal in the parentDF ?

please help with any code snippets on this.

Upvotes: 1

Views: 11393

Answers (2)

s.polam
s.polam

Reputation: 10362

Instead of toJSON use to_json in select statement & select required columns along with to_json function.

Check below code.

val version =  // Get version value from versionDF
parentDF.select($"id",struct(to_json(struct($"*")).as("value")).as("data"),lit(version).as("version"))

scala> parentDF.select($"id",struct(to_json(struct($"*")).as("value")).as("data"),lit(version).as("version")).printSchema
root
 |-- id: integer (nullable = false)
 |-- data: struct (nullable = false)
 |    |-- value: string (nullable = true)
 |-- version: double (nullable = false)

Updated

scala> df.select($"id",to_json(struct(struct($"*").as("value"))).as("data"),lit(version).as("version")).printSchema
root
 |-- id: integer (nullable = false)
 |-- data: string (nullable = true)
 |-- version: integer (nullable = false)


scala> df.select($"id",to_json(struct(struct($"*").as("value"))).as("data"),lit(version).as("version")).show(false)
+---+------------------------------------------+-------+
|id |data                                      |version|
+---+------------------------------------------+-------+
|1  |{"value":{"id":1,"col1":"a1","col2":"b1"}}|1      |
|2  |{"value":{"id":2,"col1":"a2","col2":"b2"}}|1      |
|3  |{"value":{"id":3,"col1":"a3","col2":"b3"}}|1      |
+---+------------------------------------------+-------+

Update-1

scala> df.select($"id",to_json(struct($"*").as("value")).as("data"),lit(version).as("version")).printSchema
root
 |-- id: integer (nullable = false)
 |-- data: string (nullable = true)
 |-- version: integer (nullable = false)


scala> df.select($"id",to_json(struct($"*").as("value")).as("data"),lit(version).as("version")).show(false)
+---+--------------------------------+-------+
|id |data                            |version|
+---+--------------------------------+-------+
|1  |{"id":1,"col1":"a1","col2":"b1"}|1      |
|2  |{"id":2,"col1":"a2","col2":"b2"}|1      |
|3  |{"id":3,"col1":"a3","col2":"b3"}|1      |
+---+--------------------------------+-------+

Upvotes: 3

Parvez Patel
Parvez Patel

Reputation: 211

Try this:

scala> val versionDF = List((1.0)).toDF("version")
versionDF: org.apache.spark.sql.DataFrame = [version: double]

scala> versionDF.show
+-------+
|version|
+-------+
|    1.0|
+-------+


scala> val version = versionDF.first.get(0)
version: Any = 1.0

scala>

scala> val childDF = List((1,"a1","b1"),(2,"a2","b2"),(3,"a3","b3")).toDF("id","col1","col2")
childDF: org.apache.spark.sql.DataFrame = [id: int, col1: string ... 1 more field]

scala> childDF.show
+---+----+----+
| id|col1|col2|
+---+----+----+
|  1|  a1|  b1|
|  2|  a2|  b2|
|  3|  a3|  b3|
+---+----+----+


scala>

scala> val parentDF =  childDF.toJSON.select(struct($"value").as("data")).withColumn("id",from_json($"data.value",childDF.schema).getItem("id")).withColumn("version",lit(version))
parentDF: org.apache.spark.sql.DataFrame = [data: struct<value: string>, id: int ... 1 more field]

scala> parentDF.printSchema
root
 |-- data: struct (nullable = false)
 |    |-- value: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- version: double (nullable = false)

scala> parentDF.show(false)
+----------------------------------+---+-------+
|data                              |id |version|
+----------------------------------+---+-------+
|[{"id":1,"col1":"a1","col2":"b1"}]|1  |1.0    |
|[{"id":2,"col1":"a2","col2":"b2"}]|2  |1.0    |
|[{"id":3,"col1":"a3","col2":"b3"}]|3  |1.0    |
+----------------------------------+---+-------+

Upvotes: 1

Related Questions