Mark
Mark

Reputation: 171

RDD to Dataframe Spark Couchbase

I have created an RDD from a NOSQL database and I want to convert the RDD to a data frame. I have tried many options but all result in errors.

    val df = sc.couchbaseQuery(test).map(_.value).collect().foreach(println)


{"accountStatus":"AccountOpen","custId":"140034"}
{"accountStatus":"AccountOpen","custId":"140385"}
{"accountStatus":"AccountClosed","subId":"10795","custId":"139698","subStatus":"Active"}
{"accountStatus":"AccountClosed","subId":"11364","custId":"140925","subStatus":"Paused"}
{"accountStatus":"AccountOpen","subId":"10413","custId":"138842","subStatus":"Active"}
{"accountStatus":"AccountOpen","subId":"10414","custId":"138842","subStatus":"Active"}
{"accountStatus":"AccountClosed","subId":"11314","custId":"140720","subStatus":"Paused"}
{"accountStatus":"AccountOpen","custId":"139166"}
{"accountStatus":"AccountClosed","subId":"10735","custId":"139558","subStatus":"Paused"}
{"accountStatus":"AccountOpen","custId":"139575"}
df: Unit = ()

I have tried adding .toDF() to the end of my code and also creating a schema and using createDataFrame but receive errors. Whats the best approach to converting the RDD to Dataframe?

import org.apache.spark.sql.types._

// The schema is encoded in a string
val schemaString = "accountStatus subId custId subStatus"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

//

val peopleDF = spark.createDataFrame(df,schema)

Error

<console>:101: error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
       val peopleDF = spark.createDataFrame(df,schema)

other

val df = sc.couchbaseQuery(test).map(_.value).toDF()

Error

<console>:93: error: value toDF is not a member of org.apache.spark.rdd.RDD[com.couchbase.client.java.document.json.JsonObject]
       val df1 = sc.couchbaseQuery(test).map(_.value).toDF()
                                                      ^

Upvotes: 0

Views: 407

Answers (2)

Bhima Rao Gogineni
Bhima Rao Gogineni

Reputation: 267

Try as below:

val data = spark.sparkContext
  .couchbaseQuery(N1qlQuery.simple(q), bucket)
  .map(_.value.toString())

spark.read.json(data)

Spark infers the schema from the Couchbase JSON string itself.

Upvotes: 1

ImDarrenG
ImDarrenG

Reputation: 2345

In the first example, you are assigning val df to the result of the call to foreach, which is of type Unit.

Remove the call to collect and foreach and that should work:

// removed collect().foreach() here:
val df = sc.couchbaseQuery(test).map(_.value)
import org.apache.spark.sql.types._

// The schema is encoded in a string
val schemaString = "accountStatus subId custId subStatus"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val peopleDF = spark.createDataFrame(df,schema)

For the second approach, I suspect that spark sql doesn't know how to deal with the couchbase client supplied JsonObject, so try mapping the value to a String, then use Spark sql to read the rdd as JSON

Upvotes: 0

Related Questions