Reputation: 1142
I have working with Spark 3.1.2
with Scala 2.12
. I want to parallelize some keys among nodes and they read its data based on the key which receive. So, I must work with RDD
first and then convert it to Spark DataFrame
. I read data from the table in Oracle Database
. The code is in the following:
object managementData extends App {
val num_node = 2
def read_data(group_id: Int):String = {
val table_name = "table"
val col_name = "col"
val query =
""" select f1,f2,f3,f4,f5,f6,f7,f8
| from """.stripMargin + table_name + """ where MOD(TO_NUMBER(substr("""+col_name+""", -LEAST(2, LENGTH("""+col_name+""")))),"""+num_node+""")="""+group_id
val oracleUser = "ORCL"
val oraclePassword = "XXXXXXXX"
val oracleURL = "jdbc:oracle:thin:@//X.X.X.X:1521/ORCLDB"
val ods = new OracleDataSource()
ods.setUser(oracleUser)
ods.setURL(oracleURL)
ods.setPassword(oraclePassword)
val con = ods.getConnection()
val statement = con.createStatement()
statement.setFetchSize(1000) // important
val resultSet : java.sql.ResultSet = statement.executeQuery(query) //
val ret = Iterator.continually(resultSet)
.takeWhile(_.next)
.flatMap(r => (1 until 8).map(i => r.getString(i)))
.mkString(" ")
return ret
}
def udf(rdd: RDD[String]): DataFrame = {
val spark = SparkSession
.builder.getOrCreate()
val schema = new StructType()
.add(StructField("f1", StringType, true))
.add(StructField("f2", StringType, true))
.add(StructField("f3", StringType, true))
.add(StructField("f4", StringType, true))
.add(StructField("f5", StringType, true))
.add(StructField("f6", StringType, true))
.add(StructField("f7", IntegerType, true))
.add(StructField("f8", IntegerType, true))
val df = spark.createDataFrame(rdd, schema)
return df
}
def main():Unit={
val group_list = Seq.range(1,2,1) // making a list
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("testScala")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "2")
.set("spark.task.cpus","1")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(group_list,num_node)
.map(read_data)
rdd.map(x => println(x)).collect()
udf(rdd)
}
main()
}
read_data
works perfectly. But, I cannot convert RDD
to Spark DataFrame
and receive this error:
overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass:
Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass:
Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass:
Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.types.StructType)
val df = spark.createDataFrame(rdd, schema)
Would you please guide me what is wrong about createDataFrame
method to convert RDD to Spark Dataframe?
Any help is really appreciated.
Upvotes: 2
Views: 737
Reputation: 18108
Need this type of approach:
...
val schema = new StructType()
.add(StructField("f", StringType, true))
.add(StructField("m", StringType, true))
.add(StructField("l", StringType, true))
.add(StructField("d", StringType, true))
.add(StructField("g", StringType, true))
.add(StructField("v", IntegerType, true))
val df = spark.createDataFrame(rdd, schema)
Upvotes: 1