pavan yadav
pavan yadav

Reputation: 159

How to write valid json in spark

I need to write valid json but spark allows to write single row at a time like:

{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}

Above Json is not valid. Instead I need this:

{
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
}

How Can i achieve it in java?

Upvotes: 3

Views: 7030

Answers (2)

Robin Carnow
Robin Carnow

Reputation: 11

In Python

   def write_valid_json(df, path):
       """Write df to json files, one per partition, with each file being a
           valid json array of objects (instead of Spark's json lines format).

        Each part file will have the format:
            [{json-version-rec-1},
            {json-version-rec-2},
            ...,
            {json-version-rec-N}]

        Note: If a partition is empty an empty part file will be created.
        """

        def add_delimiters(js):
            try:
                curr = next(js)
            except StopIteration:
                yield ""
                return

            result = f"[{curr}"
            while True:
                try:
                    nxt = next(js)
                    yield f"{result},"
                except StopIteration:
                    yield f"{result}]"
                    return
                curr = nxt 
                result = curr
 
        df.toJSON().mapPartitions(add_delimiters).saveAsTextFile(path)

Upvotes: 1

Piotr Kalański
Piotr Kalański

Reputation: 689

Frist start from converting DataFrame rows to json:

Scala

val jsonDs = df.toJSON

Java

Dataset<String> jsonDs = simpleProf.toJSON();

Scala example:

case class Data(name: String, age: Int)
case class DataObj(id: String, seq: Seq[Data])

val df = session.createDataFrame(Seq(
 DataObj("1", Seq(Data("n1", 1), Data("n2", 2))),
 DataObj("2", Seq(Data("n1", 1), Data("n2", 2), Data("n3", 3))),
 DataObj("3", Seq(Data("n1", 1))),
 DataObj("4", Seq(Data("n4", 44))),
 DataObj("5", Seq(Data("n5", 55)))
))

val jsonDs = df.toJSON

Next steps depends on whether you want to save to one file or multiple files per partition.

Save to one JSON file

Scala

val count = jsonDs.count()
jsonDs
  .repartition(1) // make sure it is only one partition and in consequence one output file
  .rdd
  .zipWithIndex()
  .map { case(json, idx) =>
      if(idx == 0) "[\n" + json + "," // first row
      else if(idx == count-1) json + "\n]" // last row
      else json + ","
  }
  .saveAsTextFile("path")

Java

jsonDs
  .repartition(1) // make sure it is only one partition and in consequence one output file
  .javaRDD()
  .zipWithIndex()
  .map(t -> t._2 == 0 ? "[\n" + t._1 + "," : t._2 == count-1 ? t._1 + "\n]" : t._1 + ",")
  .saveAsTextFile("path");

Save to multiple JSON files for each partition

Scala

jsonDs
  .mapPartitions(vals => Iterator("[" + vals.mkString(",") + "]"))
  .write
  .text("path")

Java

import org.apache.commons.lang3.StringUtils;

jsonDs
  .mapPartitions(input -> Arrays.asList("[" + StringUtils.join(input, ",") + "]").iterator(), Encoders.STRING())
  .write()
  .text("path");

Upvotes: 5

Related Questions