Shrinivas Suresh
Shrinivas Suresh

Reputation: 1

DataFrame write 10x slower than RDD save to cassandra in spark

I tried comparing cassandra save for a table with 30,000 records for RDD and DataSet. I found that Dataset save was 10 times slower compared to RDD. The table has 4 partitioning keys.

 DSE Version :5.1.7
 Spark version :2.0.1
 Nodes:6( 20 cores each 6g)
 Using Spark Standalone

We used the following spark configurations:

  1. spark.scheduler.listenerbus.eventqueue.size=100000
  2. spark.locality.wait=1
  3. spark.dse.continuous_paging_enabled=false
  4. spark.cassandra.input.fetch.size_in_rows=500
  5. spark.cassandra.connection.keep_alive_ms=10000
  6. spark.cassandra.output.concurrent.writes=2000
  7. num-cpu-cores=48
  8. memory-per-node=3g
  9. spark.executor.cores=3
  10. spark.cassandra.output.ignoreNulls=true
  11. spark.cassandra.output.throughput_mb_per_sec=10
  12. spark.serializer=org.apache.spark.serializer.KryoSerializer
  13. spark.cassandra.connection.local_dc=dc1
  14. spark.cassandra.connection.compression=LZ4
  15. spark.cassandra.connection.connections_per_executor_max=20

Following is the sample code for the same:

val sparkSession = SparkSession.builder().config(conf).getOrCreate()

import sparkSession.implicits._

val RDD1 = sc.cassandraTable[TableName]("keySpace1", "TableName")
           .where("id =?,id)

RDD1.saveToCassandra("keySpace1", "TableName")

var DS1 = sparkSession.read
           .format("org.apache.spark.sql.cassandra")
           .options(Map("table" -> "TableName", "keyspace" ->"keySpace1"))
           .load()
           .where("id ='"+ id +"'").as[CaseClassModel]

DS1.write.format("org.apache.spark.sql.cassandra")          
  .mode(SaveMode.Append).option("table", "TableName1")                
  .option("keyspace", "KeySpace1")
  .save()

Upvotes: 0

Views: 914

Answers (1)

RussS
RussS

Reputation: 16576

Since both Dataframe and RDD methods use the same underlying save code it is unlikely that you would see such a drastic difference unless the overhead of the conversion into DF types was very very high. In our own tests over billions of rows we see only a few percent difference in speed.

While 30k records is a very small amount and any amount of overhead could become relevant, I think the most likely cause would be the lookup in the where clause being interpreted differently in the RDD and DF code. I would check to make sure it is correctly being pushed in the DF code (see the explain for the DF load.)

Upvotes: 4

Related Questions