Ace
Ace

Reputation: 229

How to write dataframe as json array to a file? (scala)

I have a dataframe and I want to write it as json array into a single file in scala.

attempt 1:

dataframe.coalesce(1).write.format("json").save(destDir)

output 1: One row per line, where each row is a json

attempt 2:

dataframe.toJSON.coalesce(1).write.format("json").save(destDir)

output 2: same as output 1, but a weird looking json on each row {value: {key1:value1, key2:value2, ... }

attempt 3 (writing as String using java PrintWriter):

printWriter.write(dataframe.toJSON.collect.mkString("[",",","]"))

output3:

It writes an array of json to a local path. If the path is for hdfs it says FileNotFound, even if the path + file exist.

Upvotes: 5

Views: 4258

Answers (1)

Vincent Doba
Vincent Doba

Reputation: 5078

To write a dataframe as a json array, first you transform your dataframe to json string, then you transform those strings so each row is a line in your future json file, then you write the file with text instead of json

Analysis

To write a dataframe to a json, you can start from the .toJSON method as in your attemps 2 and 3:

val rawJson = dataframe.toJSON

Now you have a dataframe with one column value containing a json representation of rows as a String.

To transform this dataframe to a dataframe whose each row represents a line of your future file, you need to:

  • add a new row containing [ as first row of the dataframe
  • add a comma to all rows representing your json data
  • except for the last row with json data
  • add a new row containing ] as last row of the dataframe

As you see, concepts like "first" and "last" are important in your case, thus you need to build an ordering of the rows in your dataframe. You can associate it like that:

+-------+--------------------+------------+
| order | row                | value      |
+-------+--------------------+------------+
| 0     | first row          | "["        |
| 1     | row with json      | "  {...}," | 
| 1     | row with json      | "  {...}," | 
| ...   | ...                | ...        |
| 1     | row with json      | "  {...}," |
| 1     | row with json      | "  {...}," |
| 2     | last row with json | "  {...}"  |
| 3     | last row           | "]"        |
+-------+--------------------+------------+

First, you can distinguish the last row with json from the others. To do so, you can use Window functions. You count the number of row in a window that contains the current row and the next row, meaning you associate each row with 2, except the last one that has no next row thus that you associate with 1.

val window = Window.rowsBetween(Window.currentRow, 1)

val jsonWindow = rawJson.withColumn("order", count("value").over(window))

However, you want that the last row has 2 in column "order", and other rows have 1 in column "order". You can use modulo (%) function to achieve this:

val jsonRowsWithOrder = jsonWindow.withColumn("order", (col("order") % lit(2)) + 1)

Then you add comma to all rows except the last one, meaning you add comma to all rows whose column "order" is set to 1:

val jsonRowsWithCommas = jsonRowsWithOrder.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))

Those lines in the final file will be indented, so you indent them:

val indentedJsonRows = jsonRowsWithCommas.withColumn("value", concat(lit("  "), col("value")))

You add first and last rows, which contain open and close square brackets:

val unorderedRows = indentedJsonRows.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))

You order it:

val orderedRows = unorderedRows.orderBy("order").drop("order")

You coalesce to have only one partition as you want only one file at the end:

partitionedRows = orderedRows.coalesce(1)

And you write it as a text:

partitionedRows.write.text(destDir)

And you're done !

Complete solution

Here is the complete solution, with imports. This solution works from spark 2.3 (tested with spark 3.0):

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.rowsBetween(Window.currentRow, 1)

dataframe.toJSON
  .map(jsonString => s"  $jsonString")
  .withColumn("order", (count("value").over(window) % lit(2)) + lit(1))
  .withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
  .unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
  .orderBy("order")
  .select("value")
  .coalesce(1)
  .write.text(destDir)

Conclusion

You can write a spark dataframe as a json array by using only spark.

However, spark is a parallele computing framework, so enforcing an order and shrinking to one partition is not the way it is supposed to work. Moreover, as you can't change the name of file output by spark, the saved file will have .txt extension (but inside it is a json array)

It may be better to save your dataframe with .write.json(destDir) and then rework the ouput with classic tools instead of creating complicated logic to make it with spark.

Upvotes: 4

Related Questions