whatsinthename
whatsinthename

Reputation: 2157

How to save output of multiple queries under single JSON file in appended mode using spark scala

I have 5 queries like below:

select * from table1  
select * from table2  
select * from table3  
select * from table4  
select * from table5  

Now, what I want is I have to execute these queries in the sequential fashion and then keep on storing the output in the single JSON file in the appended mode. I wrote the below code but it stores the output for each query in different part files instead of one.
Below is my code:

def store(jobEntity: JobDetails, jobRunId: Int): Unit = {
    UDFUtil.registerUdfFunctions()
    var outputTableName: String = null
    val jobQueryMap = jobEntity.jobQueryList.map(jobQuery => (jobQuery.sequenceId, jobQuery))
    val sortedQueries = scala.collection.immutable.TreeMap(jobQueryMap.toSeq: _*).toMap
    LOGGER.debug("sortedQueries ===>" + sortedQueries)
    try {
      outputTableName = jobEntity.destinationEntity
      var resultDF: DataFrame = null
      sortedQueries.values.foreach(jobQuery => {
        LOGGER.debug(s"jobQuery.query ===> ${jobQuery.query}")
        resultDF = SparkSession.builder.getOrCreate.sqlContext.sql(jobQuery.query)

        if (jobQuery.partitionColumn != null && !jobQuery.partitionColumn.trim.isEmpty) {
          resultDF = resultDF.repartition(jobQuery.partitionColumn.split(",").map(col): _*)
        }
        if (jobQuery.isKeepInMemory) {
          resultDF = resultDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
        }
        if (jobQuery.isCheckpointEnabled) {
          val checkpointDir = ApplicationConfig.getAppConfig(JobConstants.CHECKPOINT_DIR)
          val fs = FileSystem.get(new Storage(JsonUtil.toMap[String](jobEntity.sourceConnection)).asHadoopConfig())
          val path = new Path(checkpointDir)
          if (!fs.exists(path)) {
            fs.mkdirs(path)
          }
          resultDF.explain(true)
          SparkSession.builder.getOrCreate.sparkContext.setCheckpointDir(checkpointDir)
          resultDF = resultDF.checkpoint
        }
        resultDF = {
          if (jobQuery.isBroadCast) {
            import org.apache.spark.sql.functions.broadcast
            broadcast(resultDF)
          } else
            resultDF
        }
        tempViewsList.+=(jobQuery.queryAliasName)
        resultDF.createOrReplaceTempView(jobQuery.queryAliasName)
        //      resultDF.explain(true)
        val map: Map[String, String] = JsonUtil.toMap[String](jobEntity.sinkConnection)
        LOGGER.debug("sink details :: " + map)
        if (resultDF != null && !resultDF.take(1).isEmpty) {
          resultDF.show(false)
          val sinkDetails = new Storage(JsonUtil.toMap[String](jobEntity.sinkConnection))
          val path = sinkDetails.basePath + File.separator + jobEntity.destinationEntity
          println("path::: " + path)
          resultDF.repartition(1).write.mode(SaveMode.Append).json(path)
        }
      }
      )

Just ignore the other things(Checkpointing, Logging, Auditing) that I am doing in this method along with reading and writing.

Upvotes: 0

Views: 252

Answers (2)

Nikhil Suthar
Nikhil Suthar

Reputation: 2431

Use the below example as a reference for your problem.

I have three tables with Json data (with different schema) as below:

  1. table1 --> Personal Data Table
  2. table2 --> Company Data Table
  3. table3 --> Salary Data Table

I am reading these three tables one by one in the sequential mode as per your requirement and doing few transformations over data (exploding Json array Column) with the help of List TableColList which contains Array column Name corresponding to table with a semicolon (":") separator.

OutDFList is the list of all transformed DataFrames.

At the end, I am reducing all DataFrames from OutDFList into a single dataframe and writing it into one JSON file.

Note: I have used join to reduced all DataFrames, You can also use union(if have same columns) or else as per requirement.

Check below code:

scala> spark.sql("select * from table1").printSchema
root
 |-- Personal: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- DOB: string (nullable = true)
 |    |    |-- EmpID: string (nullable = true)
 |    |    |-- Name: string (nullable = true)


scala> spark.sql("select * from table2").printSchema
root
 |-- Company: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- EmpID: string (nullable = true)
 |    |    |-- JoinDate: string (nullable = true)
 |    |    |-- Project: string (nullable = true)


scala> spark.sql("select * from table3").printSchema
root
 |-- Salary: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- EmpID: string (nullable = true)
 |    |    |-- Monthly: string (nullable = true)
 |    |    |-- Yearly: string (nullable = true)

scala> val TableColList = List("table1:Personal", "table2:Company", "table3:Salary")
TableColList: List[String] = List(table1:Personal, table2:Company, table3:Salary)


scala>  val OutDFList = TableColList.map{ X =>
     |  val table = X.split(":")(0)
     |  val arrayColumn = X.split(":")(1)
     |  val df = spark.sql(s"""SELECT * FROM """ + table).select(explode(col(arrayColumn)) as "data").select("data.*")
     | df}
OutDFList: List[org.apache.spark.sql.DataFrame] = List([DOB: string, EmpID: string ... 1 more field], [EmpID: string, JoinDate: string ... 1 more field], [EmpID: string, Monthly: string ... 1 more field])

scala> val FinalOutDF  = OutDFList.reduce((df1, df2) => df1.join(df2, "EmpID"))
FinalOutDF: org.apache.spark.sql.DataFrame = [EmpID: string, DOB: string ... 5 more fields]

scala> FinalOutDF.printSchema
root
 |-- EmpID: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- JoinDate: string (nullable = true)
 |-- Project: string (nullable = true)
 |-- Monthly: string (nullable = true)
 |-- Yearly: string (nullable = true)


scala> FinalOutDF.write.json("/FinalJsonOut")

Upvotes: 1

user6860682
user6860682

Reputation:

First thing first, you need to union all the schemas:

import org.apache.spark.sql.functions._
val df1 = sc.parallelize(List(
  (42, 11),
  (43, 21)
)).toDF("foo", "bar")

val df2 = sc.parallelize(List(
  (44, true, 1.0),
  (45, false, 3.0)
)).toDF("foo", "foo0", "foo1")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

val total = df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*))

total.show()

And obvs save to the single JSON file:

df.coalesce(1).write.mode('append').json("/some/path")

UPD

If you are not using DFs, just come along with plain SQL queries (writing to single file remains the same - coalesce(1) or repartition(1)):

spark.sql(
  """
    |SELECT id, name
    |FROM (
    | SELECT first.id, first.name, FROM first
    | UNION
    | SELECT second.id, second.name FROM second
    | ORDER BY second.name
    | ) t
  """.stripMargin).show()

Upvotes: 0

Related Questions