Reputation: 1615
I am joining Spark RDD
with Cassandra table
(lookup) but not able to understand a few things.
Cassandra table
and then join it with RDD
in spark memory or it will push down all values from RDD to Cassandra and perform the join thereCassandra
or Spark
)Spark
always pull same number of records from Cassandra
no matter what limit is applied (1 or 1000)? Code below :
//creating dataframe with fields required for join with cassandra table
//and converting same to rdd
val df_for_join = src_df.select(src_df("col1"),src_df("col2"))
val rdd_for_join = df_for_join.rdd
val result_rdd = rdd_for_join
.joinWithCassandraTable("my_keyspace", "my_table"
,selectedColumns = SomeColumns("col1","col2","col3","col4")
,SomeColumns("col1", "col2")
).where("created_at >''range_start'' and created_at<= range_end")
.clusteringOrder(Ascending).limit(1)
Cassandra table details -
PRIMARY KEY ((col1, col2), created_at) WITH CLUSTERING ORDER BY (created_at ASC)
Upvotes: 0
Views: 1220
Reputation: 87119
joinWithCassandra
table extracts partition/primary key values from the passed RDD, and converts them into individual requests against partitions in Cassandra. Then, on top of it, SCC may apply an additional filtering, like, you're where
condition. If I remember correctly, but I could be wrong, the limit won't be pushed completely to Cassandra - it still may fetch limit
rows per every partition.
You can always check where join happens by executing result_rdd.toDebugString
. For my code:
val df_for_join = Seq((2, 5),(5, 2)).toDF("col1", "col2")
val rdd_for_join = df_for_join.rdd
val result_rdd = rdd_for_join
.joinWithCassandraTable("test", "jt"
,selectedColumns = SomeColumns("col1","col2", "v")
,SomeColumns("col1", "col2")
).where("created_at >'2020-03-13T00:00:00Z' and created_at<= '2020-03-14T00:00:00Z'")
.limit(1)
it gives following:
scala> result_rdd.toDebugString
res7: String =
(2) CassandraJoinRDD[14] at RDD at CassandraRDD.scala:19 []
| MapPartitionsRDD[2] at rdd at <console>:45 []
| MapPartitionsRDD[1] at rdd at <console>:45 []
| ParallelCollectionRDD[0] at rdd at <console>:45 []
while if you do a "normal" join, you'll get following:
scala> val rdd1 = sc.parallelize(Seq((2, 5),(5, 2)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:44
scala> val ct = sc.cassandraTable[(Int, Int)]("test", "jt").select("col1", "col2")
ct: com.datastax.spark.connector.rdd.CassandraTableScanRDD[(Int, Int)] = CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19
scala> rdd1.join(ct)
res15: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[34] at join at <console>:49
scala> rdd1.join(ct).toDebugString
res16: String =
(6) MapPartitionsRDD[37] at join at <console>:49 []
| MapPartitionsRDD[36] at join at <console>:49 []
| CoGroupedRDD[35] at join at <console>:49 []
+-(3) ParallelCollectionRDD[21] at parallelize at <console>:44 []
+-(6) CassandraTableScanRDD[31] at RDD at CassandraRDD.scala:19 []
The more information is available in corresponding section of SCC documentation.
Upvotes: 2