Cedric H.
Cedric H.

Reputation: 8298

How to work with PySpark, SparkSQL and Cassandra?

I am a bit confused with the different actors in this story: PySpark, SparkSQL, Cassandra and the pyspark-cassandra connector.

As I understand Spark evolved quite a bit and SparkSQL is now a key component (with the 'dataframes'). Apparently there is absolutely no reason to work without SparkSQL, especially if connecting to Cassandra.

So my question is: what component are needed and how do I connect them together in the simplest way possible?

With spark-shell in Scala I could do simply

./bin/spark-shell --jars spark-cassandra-connector-java-assembly-1.6.0-M1-SNAPSHOT.jar

and then

import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)
cc.setKeyspace("mykeyspace")
val dataframe = cc.sql("SELECT count(*) FROM mytable group by beamstamp")

How can I do that with pyspark?

Here are a couple of subquestions along with partial answers I have collected (correct if I'm wrong).

Upvotes: 1

Views: 1558

Answers (2)

RussS
RussS

Reputation: 16576

Pyspark should be started with the spark-cassandra-connector package as described in the Spark Cassandra Connector python docs.

./bin/pyspark 
  --packages com.datastax.spark:spark-cassandra-connector_$SPARK_SCALA_VERSION:$SPARK_VERSION

With this loaded you will be able to use any of the Dataframe operations already present inside of Spark on C* dataframes. More details on options of using C* dataframes.

To set this up to run with jupyter notebook just set up your env with the following properties.

export PYSPARK_DRIVER_PYTHON=ipython
export PYSPARK_DRIVER_PYTHON_OPTS=notebook

And calling pyspark will start up a notebook correctly configured.

There is no need to use pyspark-cassandra unless you are interseted in working with RDDs in python which has a few performance pitfalls.

Upvotes: 3

zero323
zero323

Reputation: 330453

In Python connector is exposed DataFrame API. As long as spark-cassandra-connector is available and SparkConf contains required configuration there is no need for additional packages. You can simply specify the format and options:

df = (sqlContext
    .read
    .format("org.apache.spark.sql.cassandra")
    .options(table="mytable", keyspace="mykeyspace")
    .load())

If you wan to use plain SQL you can register DataFrame as follows:

df.registerTempTable("mytable")

## Optionally cache
sqlContext.cacheTable("mytable")

sqlContext.sql("SELECT count(*) FROM mytable group by beamstamp")

Advanced features of the connector, like CassandraRDD are not exposed to Python so if you need something beyond DataFrame capabilities then pyspark-cassandra may prove useful.

Upvotes: 2

Related Questions