M_Gh
M_Gh

Reputation: 1142

How to convert Spark RDD to Spark DataFrame

I have working with Spark 3.1.2 with Scala 2.12. I want to parallelize some keys among nodes and they read its data based on the key which receive. So, I must work with RDD first and then convert it to Spark DataFrame. I read data from the table in Oracle Database. The code is in the following:

   object managementData extends App {
     val num_node = 2
    def read_data(group_id: Int):String =  {
       val table_name = "table"
       val col_name = "col"
       val query =
         """ select f1,f2,f3,f4,f5,f6,f7,f8
            | from """.stripMargin + table_name + """ where MOD(TO_NUMBER(substr("""+col_name+""", -LEAST(2, LENGTH("""+col_name+""")))),"""+num_node+""")="""+group_id
      val oracleUser = "ORCL"
      val oraclePassword = "XXXXXXXX"
      val oracleURL = "jdbc:oracle:thin:@//X.X.X.X:1521/ORCLDB"
      val ods = new OracleDataSource()
       ods.setUser(oracleUser)
       ods.setURL(oracleURL)
       ods.setPassword(oraclePassword)
      val con = ods.getConnection()
      val statement = con.createStatement()
      statement.setFetchSize(1000)      // important
      val  resultSet : java.sql.ResultSet = statement.executeQuery(query) //
      val ret = Iterator.continually(resultSet)
         .takeWhile(_.next)
         .flatMap(r => (1 until 8).map(i => r.getString(i)))
         .mkString(" ")
      return ret
    }
 def udf(rdd: RDD[String]): DataFrame = {
      val spark = SparkSession
         .builder.getOrCreate()
      val schema = new StructType()
         .add(StructField("f1", StringType, true))
         .add(StructField("f2", StringType, true))
         .add(StructField("f3", StringType, true))
         .add(StructField("f4", StringType, true))
         .add(StructField("f5", StringType, true))
         .add(StructField("f6", StringType, true))
         .add(StructField("f7", IntegerType, true))
         .add(StructField("f8", IntegerType, true))
      val df = spark.createDataFrame(rdd, schema)
      return df
      }
    def main():Unit={
        val group_list = Seq.range(1,2,1) // making a list
        val conf = new SparkConf()
           .setMaster("local[2]")
           .setAppName("testScala")
           .set("spark.executor.memory", "8g")
           .set("spark.executor.cores", "2")
           .set("spark.task.cpus","1")
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(group_list,num_node)
           .map(read_data)
        rdd.map(x => println(x)).collect()
        udf(rdd)
     }
     main()
    }

read_data works perfectly. But, I cannot convert RDD to Spark DataFrame and receive this error:

 overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: 
  Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
  Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: 
  Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema:  org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
   cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.types.StructType)
val df = spark.createDataFrame(rdd, schema)

Would you please guide me what is wrong about createDataFrame method to convert RDD to Spark Dataframe?

Any help is really appreciated.

Upvotes: 2

Views: 737

Answers (1)

Ged
Ged

Reputation: 18108

Need this type of approach:

...
val schema = new StructType()
  .add(StructField("f", StringType, true))
  .add(StructField("m", StringType, true))
  .add(StructField("l", StringType, true))
  .add(StructField("d", StringType, true))
  .add(StructField("g", StringType, true))
  .add(StructField("v", IntegerType, true))

val df = spark.createDataFrame(rdd, schema)

Upvotes: 1

Related Questions