Ulyses
Ulyses

Reputation: 121

PySpark and Cassandra

I am very confused about how to use pyspark and cassandra connector. Some posts say it is just a matter of using the SparkContext and SparkSession from pyspark and other posts say those don't work and I have to use pyspark-cassandra. Can someone please tell me what is the right way to connect a datastax remote cassandra database with pyspark?

This is what I want to do:

This is what I want to do, but I have seen so many posts and none has worked entirely, I don't want to use the pyspark shell directly, if possible I want to do all in python code in some code editor, I mean, no within the spark terminal.

Thanks

Upvotes: 2

Views: 4891

Answers (1)

Alex Ott
Alex Ott

Reputation: 87359

When people are mentioning the pyspark-cassandra - they are mostly mention it because it exposes the RDD part of Spark Cassandra Connector (SCC), that is not exposed by SCC itself (for Python it exposes only Dataframe API).

How to use SCC with Astra is quite good described in the SCC 2.5.0 release announcement blog post, and in the documentation. You start pyspark with following command (you may specify username, password, and other parameters, except --packages inside your code, not necessary on the command line):

pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1\ 
  --files path_to/secure-connect-test.zip \
  --conf spark.cassandra.connection.config.cloud.path=secure-connect-test.zip \
  --conf spark.cassandra.auth.username=UserName \
  --conf spark.cassandra.auth.password=Password \
  --conf spark.dse.continuousPagingEnabled=false

Please note the flag disabling continuous paging - it's required right now.

After process has started, just execute Spark commands that read, transform and write data:

>>> from pyspark.sql.functions import col

# read data
>>> data = park.read.format("org.apache.spark.sql.cassandra")\
   .options(table="t2", keyspace="test").load()
>>> data.count()
5
>>> data.show(5, truncate = False)
+---+-----------------------+
|id |tm                     |
+---+-----------------------+
|4  |2020-06-23 10:37:25.825|
|3  |2020-06-23 10:37:25.754|
|5  |2020-06-23 10:37:25.852|
|1  |2020-06-23 10:37:25.701|
|2  |2020-06-23 10:37:25.726|
+---+-----------------------+

# generate new data frame
>>> data2 = data.select((col("id") + 10).alias("id"), col("tm"))
>>> data2.show()
+---+--------------------+
| id|                  tm|
+---+--------------------+
| 13|2020-06-23 10:37:...|
| 14|2020-06-23 10:37:...|
| 15|2020-06-23 10:37:...|
| 11|2020-06-23 10:37:...|
| 12|2020-06-23 10:37:...|
+---+--------------------+

# write the data
>>> data2.write.format("org.apache.spark.sql.cassandra")\
  .options(table="t2", keyspace="test").mode("append").save()

# check that data is written
>>> spark.read.format("org.apache.spark.sql.cassandra")\
  .options(table="t2", keyspace="test").load().count()
10

Upvotes: 4

Related Questions