Devas
Devas

Reputation: 1714

type mismatch; found : Unit required: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]

Why the following code has a compilation error at return statement,

  def getData(queries: Array[String]): Dataset[Row] = {
    val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")
    return res
  }

Error,

type mismatch; found : Unit required: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]

Scala version 2.11.11

Spark version 2.0.0

EDIT: Actual case

  def getDataFrames(queries: Array[String]) = {
    val jdbcResult = queries.map(query => {
      val tablename = extractTableName(query)
      if (tablename.contains("1")) {
        spark.sqlContext.read.format("jdbc").jdbc(jdbcUrl1, query, props)
      } else {
        spark.sqlContext.read.format("jdbc").jdbc(jdbcUrl2, query, props)
      }
    })
  }

Here I want to return the combined output from the iteration like an Array[Dataset[Row]] or Array[DataFrame] (but Dataframe is not available in 2.0.0 as a dependency). Do the above code does the magic ? or How can I do it?

Upvotes: 0

Views: 8922

Answers (2)

koiralo
koiralo

Reputation: 23119

You can return a list of dataframes as List[Dataframe]

def getData(queries: Array[String]): List[Dataframe] = {
  val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props)
  //create multiple dataframe from your queries
  val df1 = ???
  val df2 = ???
  val list = List(df1, df2)
  //You can create a list dynamically with list of quries 
  list
}

registerTempTable returns Unit you better remove the registerTempTable and return Dataframe, and return a list of dataframes.

UPDATE:

Here is how you can return list of dataframes with list of queries

def getDataFrames(queries: Array[String]): Array[DataFrame] = {
  val jdbcResult = queries.map(query => {
    val tablename = extractTableName(query)
    val dataframe = if (tablename.contains("1")) {
      spark.sqlContext.read.format("jdbc").jdbc("", query, prop)
    } else {
      spark.sqlContext.read.format("jdbc").jdbc("", query, prop)
    }
    dataframe
  })
  jdbcResult
}

I hope this helps!

Upvotes: 2

Yayati Sule
Yayati Sule

Reputation: 1631

Its clear from the error message that there is a type mismatch in your function. registerTempTable() api creates an in-memory table scoped to the current session and stays accesible till the SparkSession is active.

Check the return type of registerTempTable() api here

change your code to the following to remove the error message:

def getData(queries: Array[String]): Unit = {
    val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")

  }

an even better way would be to write the code as follows:

val tempName: String = "Name_Of_Temp_View" spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView(tempName)

Use the createOrReplaceTempView() as registerTempTable() is deprecated since Spark 2.0.0

The Alternate solution as per your requirement:

def getData(queries: Array[String], spark: SparkSession): Array[DataFrame] = { spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView("Name_Of_Temp_Table") val result: Array[DataFrame] = queries.map(query => spark.sql(query)) result }

Upvotes: 1

Related Questions