Reputation: 187
Can anyone please explain the internal working of spark when reading data from one table and writing it to another in cassandra.
Here is my use case:
I am ingesting data coming in from an IOT platform into cassandra through a kafka topic. I have a small python script that parses each message from kafka to get the tablename it belongs to, prepares a query and writes it to cassandra using datastax's cassandra-driver for python. With that script I am able to ingest around 300000 records per min into cassandra. However my incoming data rate is 510000 records per minute so kafka consumer lag keeps on increasing.
Python script is already making concurrent calls to cassandra. If I increase the number of python executors, cassandra-driver starts failing because cassandra nodes become unavailable to it. I am assumin there is a limit of cassandra calls per sec that I am hitting there. Here is the error message that I get:
ERROR Operation failed: ('Unable to complete the operation against any hosts', {<Host: 10.128.1.3 datacenter1>: ConnectionException('Pool is shutdown',), <Host: 10.128.1.1 datacenter1>: ConnectionException('Pool is shutdown',)})"
Recently, I ran a pyspark job to copy data from a couple of columns in one table to another. The table had around 168 million records in it. Pyspark job completed in around 5 hours. So it processed over 550000 records per min.
Here is the pyspark code I am using:
df = spark.read\
.format("org.apache.spark.sql.cassandra")\
.options(table=sourcetable, keyspace=sourcekeyspace)\
.load().cache()
df.createOrReplaceTempView("data")
query = ("select dev_id,datetime,DATE_FORMAT(datetime,'yyyy-MM-dd') as day, " + field + " as value from data " )
vgDF = spark.sql(query)
vgDF.show(50)
vgDF.write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table=newtable, keyspace=newkeyspace)\
.save()
Versions:
Cluster:
So I am wondering:
Does spark read all the data from cassandra first and then writes it to the new table or is there some kind of optimization in spark cassandra connector that allows it to move the data around cassandra tables without reading all the records?
If I replace my python script with a spark streaming job in which I parse the packet to get the table name for cassandra, will that help me ingest data more quickly into cassandra?
Upvotes: 1
Views: 1490
Reputation: 87259
Spark connector is optimized because it parallelize processing and reading/inserting data into nodes that are owns the data. You may get better throughput by using Cassandra Spark Connector, but this will require more resources.
Talking about your task - 300000 inserts/minute is 5000/second, and this is not very big number frankly speaking - you can increase throughput by putting different optimizations:
LOCAL_ONE
should give you very good performance)Upvotes: 2