Reputation: 110
I'm currently using npm's cassandra-driver to query my Cassandra database from a Node.js server. Since I want to be able to write more complex queries, I'd like to use Spark SQL instead of CQL. Is there any way to create a RESTful API (or something else) so that I can use Spark SQL the same way that I currently use CQL?
In other words, I want to be able to send a Spark SQL query from my Node.js server to another server and get a result back.
Is there any way to do this? I've been searching for solutions to this problem for a while and haven't found anything yet.
Edit: I'm able to query my database with Scala and Spark SQL from the Spark shell, so that bit is working. I just need to connect Spark and my Node.js server somehow.
Upvotes: 3
Views: 2550
Reputation: 17862
I had a similar problem, and I solved by using Spark-JobServer.
The main approach with Spark-Jobserver (SJS) usually is to create a special job that extends their SparkSQLJob such as in the following example:
object ExecuteQuery extends SparkSQLJob {
override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = sqlContext.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
However, for this approach to work well with Cassandra, you have to use the spark-cassandra-connector and then, to load the data you will have two options:
1) Before calling this ExecuteQuery
via REST, you have to transfer the full data you want to query from Cassandra to Spark. For that, you would do something like (code adapted from the spark-cassandra-connector documentation):
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load()
And then register it as a table in order to SparkSQL be able to access it:
df.registerAsTempTable("myTable") // As a temporary table
df.write.saveAsTable("myTable") // As a persistent Hive Table
Only after that you would be able to use the ExecuteQuery
to query from myTable
.
2) As the first option can be inefficient in some use cases, there is another option.
The spark-cassandra-connector has a special CassandraSQLContext that can be used to query C* tables directly from Spark. It can be used like:
val cc = new CassandraSQLContext(sc)
val df = cc.sql("SELECT * FROM keyspace.table ...")
However, to use a different type of context with Spark-JobServer, you need to extend SparkContextFactory
and use it in the moment of context creation (which can be done by a POST request to /contexts
). An example of a special context factory can be seen on SJS Gitub. You also have to create a SparkCassandraJob
, extending SparkJob
(but this part is very easy).
Finally, the ExecuteQuery
job have to be adapted to use the new classes. It would be something like:
object ExecuteQuery extends SparkCassandraJob {
override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = cc.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}
After that, the ExecuteQuery
job can be executed via REST with a POST request.
Conclusion
Here I use the first option because I need the advanced queries available in the HiveContext
(window functions, for example), which are not available in the CassandraSQLContext
. However, if you don't need those kind of operations, I recommend the second approach, even if it needs some extra coding to create a new ContextFactory for SJS.
Upvotes: 2