Reputation: 1267
Im trying convert an RDD form couchbase into a DataFrame (scala 2.11 - and spark 2.1) but get an overloaded error, my code is below, any ideas? Another thread didnt quite answer this.
Im doing this in a Databricks notebook and I use the couch connector for pure DataFrames fine, but if I want to do a customer N1QL query, someting more bespoke, this is the best I can figure, using RDDs first?
Firstly is there a better way to execute this query in native Dataframe? I think I need to use n1qL and RDD or am I missing something here?
Please let me know what Im doing wrong with the RDD conversion code below, I also get the :84: error: overloaded method value createDataFrame with alternatives: error....Thanks!
val reconciliationSchema =
new StructType()
.add("numEvents", IntegerType)
.add("eventCategory", StringType)
.add("eventName", StringType)
val orderEventsCouchbaseQuery = """
SELECT
count(*) as numEvents, event.eventCategory, event.eventName
FROM
events
WHERE
STR_TO_UTC(event.eventOccurredTime)
BETWEEN STR_TO_UTC("2017-06-16") AND STR_TO_UTC("2017-06-26")
GROUP BY event.eventCategory, event.eventName
order by event.eventCategory, event.eventName
"""
val queryResultRDD = sc.couchbaseQuery(N1qlQuery.simple(orderEventsCouchbaseQuery),"events").map(_.value)
val queryResultDF: DataFrame = spark.createDataFrame(queryResultRDD,reconciliationSchema)
display(queryResultDF)
Upvotes: 0
Views: 264
Reputation: 1124
I think the problem you are running into is not so much a couchbase related one, but rather a spark/scala type inference issue. When you use createDataFrame
, spark in this case needs to work with a Row
and not with the return type of a couchbase query for that rdd.
So here is some similar sample code where you can see that when turned into a row it works fine:
val query = N1qlQuery.simple("" +
"select country, count(*) as count " +
"from `travel-sample` " +
"where type = 'airport' " +
"group by country " +
"order by count desc")
val schema = StructType(
StructField("count", IntegerType) ::
StructField("country", StringType) :: Nil
)
val rdd = spark.sparkContext.couchbaseQuery(query).map(r => Row(r.value.getInt("count"), r.value.getString("country")))
spark.createDataFrame(rdd, schema).show()
Upvotes: 1