Reputation: 1714
Why the following code has a compilation error at return statement,
def getData(queries: Array[String]): Dataset[Row] = {
val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")
return res
}
Error,
type mismatch; found : Unit required: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]
Scala version 2.11.11
Spark version 2.0.0
EDIT: Actual case
def getDataFrames(queries: Array[String]) = {
val jdbcResult = queries.map(query => {
val tablename = extractTableName(query)
if (tablename.contains("1")) {
spark.sqlContext.read.format("jdbc").jdbc(jdbcUrl1, query, props)
} else {
spark.sqlContext.read.format("jdbc").jdbc(jdbcUrl2, query, props)
}
})
}
Here I want to return the combined output from the iteration like an Array[Dataset[Row]] or Array[DataFrame] (but Dataframe is not available in 2.0.0 as a dependency). Do the above code does the magic ? or How can I do it?
Upvotes: 0
Views: 8922
Reputation: 23119
You can return a list
of dataframes
as List[Dataframe]
def getData(queries: Array[String]): List[Dataframe] = {
val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props)
//create multiple dataframe from your queries
val df1 = ???
val df2 = ???
val list = List(df1, df2)
//You can create a list dynamically with list of quries
list
}
registerTempTable
returns Unit
you better remove the registerTempTable
and return Dataframe
, and return a list
of dataframes.
UPDATE:
Here is how you can return list of dataframes with list of queries
def getDataFrames(queries: Array[String]): Array[DataFrame] = {
val jdbcResult = queries.map(query => {
val tablename = extractTableName(query)
val dataframe = if (tablename.contains("1")) {
spark.sqlContext.read.format("jdbc").jdbc("", query, prop)
} else {
spark.sqlContext.read.format("jdbc").jdbc("", query, prop)
}
dataframe
})
jdbcResult
}
I hope this helps!
Upvotes: 2
Reputation: 1631
Its clear from the error message that there is a type mismatch in your function.
registerTempTable()
api creates an in-memory table scoped to the current session and stays accesible till the SparkSession is active.
Check the return type of registerTempTable() api here
change your code to the following to remove the error message:
def getData(queries: Array[String]): Unit = {
val res = spark.read.format("jdbc").jdbc(jdbcUrl, "", props).registerTempTable("")
}
an even better way would be to write the code as follows:
val tempName: String = "Name_Of_Temp_View"
spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView(tempName)
Use the createOrReplaceTempView()
as registerTempTable()
is deprecated since Spark 2.0.0
The Alternate solution as per your requirement:
def getData(queries: Array[String], spark: SparkSession): Array[DataFrame] = {
spark.read.format("jdbc").jdbc(jdbcUrl, "", props).createOrReplaceTempView("Name_Of_Temp_Table")
val result: Array[DataFrame] = queries.map(query => spark.sql(query))
result }
Upvotes: 1