Kurt Maile
Kurt Maile

Reputation: 1267

Spark Couchbase Connector - N1QL RDD to DataFrame

Im trying convert an RDD form couchbase into a DataFrame (scala 2.11 - and spark 2.1) but get an overloaded error, my code is below, any ideas? Another thread didnt quite answer this.

Im doing this in a Databricks notebook and I use the couch connector for pure DataFrames fine, but if I want to do a customer N1QL query, someting more bespoke, this is the best I can figure, using RDDs first?

Firstly is there a better way to execute this query in native Dataframe? I think I need to use n1qL and RDD or am I missing something here?

Please let me know what Im doing wrong with the RDD conversion code below, I also get the :84: error: overloaded method value createDataFrame with alternatives: error....Thanks!

val reconciliationSchema = 
   new StructType()
      .add("numEvents", IntegerType)
      .add("eventCategory", StringType)
      .add("eventName", StringType)

val orderEventsCouchbaseQuery = """
  SELECT 
    count(*) as numEvents, event.eventCategory, event.eventName
  FROM 
    events
  WHERE 
    STR_TO_UTC(event.eventOccurredTime)
      BETWEEN STR_TO_UTC("2017-06-16") AND STR_TO_UTC("2017-06-26")
  GROUP BY event.eventCategory, event.eventName
  order by event.eventCategory, event.eventName
"""

val queryResultRDD = sc.couchbaseQuery(N1qlQuery.simple(orderEventsCouchbaseQuery),"events").map(_.value)
val queryResultDF: DataFrame = spark.createDataFrame(queryResultRDD,reconciliationSchema)
display(queryResultDF)

Upvotes: 0

Views: 264

Answers (1)

daschl
daschl

Reputation: 1124

I think the problem you are running into is not so much a couchbase related one, but rather a spark/scala type inference issue. When you use createDataFrame, spark in this case needs to work with a Row and not with the return type of a couchbase query for that rdd.

So here is some similar sample code where you can see that when turned into a row it works fine:

val query = N1qlQuery.simple("" +
      "select country, count(*) as count " +
      "from `travel-sample` " +
      "where type = 'airport' " +
      "group by country " +
      "order by count desc")

val schema = StructType(
        StructField("count", IntegerType) ::
        StructField("country", StringType) :: Nil
    )

val rdd = spark.sparkContext.couchbaseQuery(query).map(r => Row(r.value.getInt("count"), r.value.getString("country")))
spark.createDataFrame(rdd, schema).show()

Upvotes: 1

Related Questions