Reputation: 171
I have created an RDD from a NOSQL database and I want to convert the RDD to a data frame. I have tried many options but all result in errors.
val df = sc.couchbaseQuery(test).map(_.value).collect().foreach(println)
{"accountStatus":"AccountOpen","custId":"140034"}
{"accountStatus":"AccountOpen","custId":"140385"}
{"accountStatus":"AccountClosed","subId":"10795","custId":"139698","subStatus":"Active"}
{"accountStatus":"AccountClosed","subId":"11364","custId":"140925","subStatus":"Paused"}
{"accountStatus":"AccountOpen","subId":"10413","custId":"138842","subStatus":"Active"}
{"accountStatus":"AccountOpen","subId":"10414","custId":"138842","subStatus":"Active"}
{"accountStatus":"AccountClosed","subId":"11314","custId":"140720","subStatus":"Paused"}
{"accountStatus":"AccountOpen","custId":"139166"}
{"accountStatus":"AccountClosed","subId":"10735","custId":"139558","subStatus":"Paused"}
{"accountStatus":"AccountOpen","custId":"139575"}
df: Unit = ()
I have tried adding .toDF() to the end of my code and also creating a schema and using createDataFrame but receive errors. Whats the best approach to converting the RDD to Dataframe?
import org.apache.spark.sql.types._
// The schema is encoded in a string
val schemaString = "accountStatus subId custId subStatus"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//
val peopleDF = spark.createDataFrame(df,schema)
Error
<console>:101: error: overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
val peopleDF = spark.createDataFrame(df,schema)
other
val df = sc.couchbaseQuery(test).map(_.value).toDF()
Error
<console>:93: error: value toDF is not a member of org.apache.spark.rdd.RDD[com.couchbase.client.java.document.json.JsonObject]
val df1 = sc.couchbaseQuery(test).map(_.value).toDF()
^
Upvotes: 0
Views: 407
Reputation: 267
Try as below:
val data = spark.sparkContext
.couchbaseQuery(N1qlQuery.simple(q), bucket)
.map(_.value.toString())
spark.read.json(data)
Spark infers the schema from the Couchbase JSON string itself.
Upvotes: 1
Reputation: 2345
In the first example, you are assigning val df
to the result of the call to foreach, which is of type Unit
.
Remove the call to collect and foreach and that should work:
// removed collect().foreach() here:
val df = sc.couchbaseQuery(test).map(_.value)
import org.apache.spark.sql.types._
// The schema is encoded in a string
val schemaString = "accountStatus subId custId subStatus"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val peopleDF = spark.createDataFrame(df,schema)
For the second approach, I suspect that spark sql doesn't know how to deal with the couchbase client supplied JsonObject, so try mapping the value to a String, then use Spark sql to read the rdd as JSON
Upvotes: 0