omer khalid
omer khalid

Reputation: 895

How to Convert Spark RDD into JSON using Scala Language

I am using MongoDB Spark Connector to get a collection. The aim is that we want to return all the documents that are present in the collection. We want to return all these documents as an array of JSON documents.

I am able to get the collection but I am not sure how to convert the customRDD object which contains the list of documents to a JSON format. I can convert the first document as you can see in the code but how to convert all the documents that are read from the collection and then make one JSON message and send it.

Expected Output:

This can be the array of documents.

{
   "objects":[
      {
         ...
      },
      {
         ....
      }
   ]
} 

Existing Code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config._
import com.mongodb.spark._
import org.json4s.native.JsonMethods._
import org.json4s.JsonDSL.WithDouble._

 var conf = new SparkConf()
    conf.setAppName("MongoSparkConnectorIntro")
      .setMaster("local")
      .set("spark.hadoop.validateOutputSpecs", "false")
      .set("spark.mongodb.input.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")
      .set("spark.mongodb.output.uri","mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred")

    sc = new SparkContext(conf)
    val spark = SparkSession.builder().master("spark://192.168.137.103:7077").appName("MongoSparkConnectorIntro").config("spark.mongodb.input.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").config("spark.mongodb.output.uri", "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred").getOrCreate()


    //val readConfig = ReadConfig(Map("collection" -> "metadata_collection", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val readConfig = ReadConfig(Map("uri" -> "mongodb://127.0.0.1/mystore.mycollection?readPreference=primaryPreferred"))
    val customRdd = MongoSpark.load(sc, readConfig)

    //println("Before Printing the value" + customRdd.toString())
    println("The Count: "+customRdd.count)
    println("The First Document: " + customRdd.first.toString())

    val resultJSOn = "MetaDataFinalResponse" -> customRdd.collect().toList

    val stringResponse = customRdd.first().toJson()
    println("Final Response: " +stringResponse)
    return stringResponse

Note:

I don't want to further map the JSON documents into another model. I want them to be as it is. I just want to aggregate them in one JSON message.

Spark Version: 2.4.0

SBT File:

name := "Test"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.7.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

Upvotes: 0

Views: 340

Answers (1)

OldWolfs
OldWolfs

Reputation: 626

This answer generates json string without escape characters and much more efficient but you need to collect RDD to perform this(you can remove the code from my previous answer);

// We will create a new Document with the documents that are fetched from MongoDB
import scala.collection.JavaConverters._
import org.bson.Document
// Collect customRdd and convert to java array 
// (we can only create new Document with java collections)
val documents = customRdd.collect().toSeq.asJava
// Create new document with the field name you want
val stringResponse = new Document().append("objects", documents).toJson()

Upvotes: 1

Related Questions