Reputation: 205
I have a list of ids in string format, this list can be roughly 20,000 ids long:
var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;
When I use this list against one of my cassandra tables it works just fine, no matter the size of timelineIdsString:
var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
.select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)
When I do it against another of my tables, it never completes when I have over 1000 ids in the List:
var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
.select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);
Both cassandra tables have the key as the timelineid so I know that its correct. This code works fine as long as timelineids is a small list.
Is there a better way to filter from cassandra RDD? Is it the size of the IN clause causing it to choke?
Upvotes: 1
Views: 417
Reputation: 87119
Instead of performing join on Spark level it's better to perform join using Cassandra itself - in this case you'll read from Cassandra only the necessary data (given that join key is partition or primary key). For RDDs this is could be done with .joinWithCassandraTable
function (doc):
import com.datastax.spark.connector._
val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
val joined = toJoin.joinWithCassandraTable("test", "jtest1")
.on(SomeColumns("pk"))
scala> joined.toDebugString
res21: String =
(8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
| ParallelCollectionRDD[147] at parallelize at <console>:33 []
For Dataframes it's so called direct join that is available since SCC 2.5 (see announcement) - you need to pass some configs to enable it, see docs:
import spark.implicits._
import org.apache.spark.sql.cassandra._
val cassdata = spark.read.cassandraFormat("jtest1", "test").load
val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))
scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#0L as int) AS id#2]
+- *(1) Range (1, 5, step=1, splits=8)
I have quite a long & detailed blog post about joins with Cassandra - check it for more details.
Upvotes: 1
Reputation: 11
You can try instead keeping the IDs list as a dataframe, timelineIds
, and inner joining the table with it based on timelineid
. Then remove the unnecessary column (timelineIds.timelineid
) from the resulting df.
Upvotes: 1