Reputation: 159
I need to write valid json but spark allows to write single row at a time like:
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
Above Json is not valid. Instead I need this:
{
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
}
How Can i achieve it in java?
Upvotes: 3
Views: 7030
Reputation: 11
def write_valid_json(df, path):
"""Write df to json files, one per partition, with each file being a
valid json array of objects (instead of Spark's json lines format).
Each part file will have the format:
[{json-version-rec-1},
{json-version-rec-2},
...,
{json-version-rec-N}]
Note: If a partition is empty an empty part file will be created.
"""
def add_delimiters(js):
try:
curr = next(js)
except StopIteration:
yield ""
return
result = f"[{curr}"
while True:
try:
nxt = next(js)
yield f"{result},"
except StopIteration:
yield f"{result}]"
return
curr = nxt
result = curr
df.toJSON().mapPartitions(add_delimiters).saveAsTextFile(path)
Upvotes: 1
Reputation: 689
Frist start from converting DataFrame
rows to json:
Scala
val jsonDs = df.toJSON
Java
Dataset<String> jsonDs = simpleProf.toJSON();
Scala example:
case class Data(name: String, age: Int)
case class DataObj(id: String, seq: Seq[Data])
val df = session.createDataFrame(Seq(
DataObj("1", Seq(Data("n1", 1), Data("n2", 2))),
DataObj("2", Seq(Data("n1", 1), Data("n2", 2), Data("n3", 3))),
DataObj("3", Seq(Data("n1", 1))),
DataObj("4", Seq(Data("n4", 44))),
DataObj("5", Seq(Data("n5", 55)))
))
val jsonDs = df.toJSON
Next steps depends on whether you want to save to one file or multiple files per partition.
Scala
val count = jsonDs.count()
jsonDs
.repartition(1) // make sure it is only one partition and in consequence one output file
.rdd
.zipWithIndex()
.map { case(json, idx) =>
if(idx == 0) "[\n" + json + "," // first row
else if(idx == count-1) json + "\n]" // last row
else json + ","
}
.saveAsTextFile("path")
Java
jsonDs
.repartition(1) // make sure it is only one partition and in consequence one output file
.javaRDD()
.zipWithIndex()
.map(t -> t._2 == 0 ? "[\n" + t._1 + "," : t._2 == count-1 ? t._1 + "\n]" : t._1 + ",")
.saveAsTextFile("path");
Scala
jsonDs
.mapPartitions(vals => Iterator("[" + vals.mkString(",") + "]"))
.write
.text("path")
Java
import org.apache.commons.lang3.StringUtils;
jsonDs
.mapPartitions(input -> Arrays.asList("[" + StringUtils.join(input, ",") + "]").iterator(), Encoders.STRING())
.write()
.text("path");
Upvotes: 5