Reputation: 229
I have a dataframe and I want to write it as json array into a single file in scala.
dataframe.coalesce(1).write.format("json").save(destDir)
output 1: One row per line, where each row is a json
dataframe.toJSON.coalesce(1).write.format("json").save(destDir)
output 2: same as output 1, but a weird looking json on each row {value: {key1:value1, key2:value2, ... }
printWriter.write(dataframe.toJSON.collect.mkString("[",",","]"))
output3:
It writes an array of json to a local path. If the path is for hdfs it says FileNotFound, even if the path + file exist.
Upvotes: 5
Views: 4258
Reputation: 5078
To write a dataframe as a json array, first you transform your dataframe to json string, then you transform those strings so each row is a line in your future json file, then you write the file with text
instead of json
To write a dataframe to a json, you can start from the .toJSON
method as in your attemps 2 and 3:
val rawJson = dataframe.toJSON
Now you have a dataframe with one column value
containing a json representation of rows as a String
.
To transform this dataframe to a dataframe whose each row represents a line of your future file, you need to:
[
as first row of the dataframe]
as last row of the dataframeAs you see, concepts like "first" and "last" are important in your case, thus you need to build an ordering of the rows in your dataframe. You can associate it like that:
+-------+--------------------+------------+
| order | row | value |
+-------+--------------------+------------+
| 0 | first row | "[" |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| ... | ... | ... |
| 1 | row with json | " {...}," |
| 1 | row with json | " {...}," |
| 2 | last row with json | " {...}" |
| 3 | last row | "]" |
+-------+--------------------+------------+
First, you can distinguish the last row with json from the others. To do so, you can use Window functions. You count the number of row in a window that contains the current row and the next row, meaning you associate each row with 2, except the last one that has no next row thus that you associate with 1.
val window = Window.rowsBetween(Window.currentRow, 1)
val jsonWindow = rawJson.withColumn("order", count("value").over(window))
However, you want that the last row has 2 in column "order", and other rows have 1 in column "order". You can use modulo (%
) function to achieve this:
val jsonRowsWithOrder = jsonWindow.withColumn("order", (col("order") % lit(2)) + 1)
Then you add comma to all rows except the last one, meaning you add comma to all rows whose column "order" is set to 1:
val jsonRowsWithCommas = jsonRowsWithOrder.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
Those lines in the final file will be indented, so you indent them:
val indentedJsonRows = jsonRowsWithCommas.withColumn("value", concat(lit(" "), col("value")))
You add first and last rows, which contain open and close square brackets:
val unorderedRows = indentedJsonRows.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
You order it:
val orderedRows = unorderedRows.orderBy("order").drop("order")
You coalesce to have only one partition as you want only one file at the end:
partitionedRows = orderedRows.coalesce(1)
And you write it as a text:
partitionedRows.write.text(destDir)
And you're done !
Here is the complete solution, with imports. This solution works from spark 2.3 (tested with spark 3.0):
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.rowsBetween(Window.currentRow, 1)
dataframe.toJSON
.map(jsonString => s" $jsonString")
.withColumn("order", (count("value").over(window) % lit(2)) + lit(1))
.withColumn("value", when(col("order").equalTo(1), concat(col("value"), lit(","))).otherwise(col("value")))
.unionByName(Seq((0, "["), (3, "]")).toDF("order", "value"))
.orderBy("order")
.select("value")
.coalesce(1)
.write.text(destDir)
You can write a spark dataframe as a json array by using only spark.
However, spark is a parallele computing framework, so enforcing an order and shrinking to one partition is not the way it is supposed to work. Moreover, as
you can't change the name of file output by spark, the saved file will have .txt
extension (but inside it is a json array)
It may be better to save your dataframe with .write.json(destDir)
and then rework the ouput with classic tools instead of creating complicated logic to make
it with spark.
Upvotes: 4