farazmateen
farazmateen

Reputation: 187

How does spark copy data between cassandra tables?

Can anyone please explain the internal working of spark when reading data from one table and writing it to another in cassandra.

Here is my use case:

I am ingesting data coming in from an IOT platform into cassandra through a kafka topic. I have a small python script that parses each message from kafka to get the tablename it belongs to, prepares a query and writes it to cassandra using datastax's cassandra-driver for python. With that script I am able to ingest around 300000 records per min into cassandra. However my incoming data rate is 510000 records per minute so kafka consumer lag keeps on increasing.

Python script is already making concurrent calls to cassandra. If I increase the number of python executors, cassandra-driver starts failing because cassandra nodes become unavailable to it. I am assumin there is a limit of cassandra calls per sec that I am hitting there. Here is the error message that I get:

ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"

Recently, I ran a pyspark job to copy data from a couple of columns in one table to another. The table had around 168 million records in it. Pyspark job completed in around 5 hours. So it processed over 550000 records per min.

Here is the pyspark code I am using:

df = spark.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table=sourcetable, keyspace=sourcekeyspace)\
    .load().cache()

df.createOrReplaceTempView("data")

query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value  from data  " )

vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table=newtable, keyspace=newkeyspace)\
    .save()

Versions:

Cluster:

So I am wondering:

Upvotes: 1

Views: 1490

Answers (1)

Alex Ott
Alex Ott

Reputation: 87259

Spark connector is optimized because it parallelize processing and reading/inserting data into nodes that are owns the data. You may get better throughput by using Cassandra Spark Connector, but this will require more resources.

Talking about your task - 300000 inserts/minute is 5000/second, and this is not very big number frankly speaking - you can increase throughput by putting different optimizations:

  • Using asynchronous calls to submit requests. You only need to make sure that you submit more requests that could be handled by one connection (but you can also increase this number - I'm not sure how to do it in Python, but please check Java driver doc to get an idea).
  • use correct consistency level (LOCAL_ONE should give you very good performance)
  • use correct load balancing policy
  • you can run several copies of your script in parallel, making sure that they are all in the same Kafka consumer group.

Upvotes: 2

Related Questions