David Semeria
David Semeria

Reputation: 542

Distributed loading of a wide row into Spark from Cassandra

Let's assume we have a Cassandra cluster with RF = N and a table containing wide rows.

Our table could have an index something like this: pk / ck1 / ck2 / ....

If we create an RDD from a row in the table as follows:

val wide_row = sc.cassandraTable(KS, TABLE).select("c1", "c2").where("pk = ?", PK)

I notice that one Spark node has 100% of the data and the others have none. I assume this is because the spark-cassandra-connector has no way of breaking down the query token range into smaller sub ranges because it's actually not a range -- it's simply the hash of PK.

At this point we could simply call redistribute(N) to spread the data across the Spark cluster before processing, but this has the effect of moving data across the network to nodes that already have the data locally in Cassandra (remember RF = N)

What we would really like is to have each Spark node load a subset (slice) of the row locally from Cassandra.

One approach which came to mind is to generate an RDD containing a list of distinct values of the first cluster key (ck1) when pk = PK. We could then use mapPartitions() to load a slice of the wide row based on each value of ck1.

Assuming we already have our list values for ck1, we could write something like this:

val ck1_list = ....  // RDD

ck1_list.repartition(ck1_list.count().toInt) // create a partition for each value of ck1

val wide_row = ck1_list.mapPartitions(f) 

Within the partition iterator, f(), we would like to call another function g(pk, ck1) which loads the row slice from Cassandra for partition key pk and cluster key ck1. We could then apply flatMap to ck1_list so as to create a fully distributed RDD of the wide row without any shuffing.

So here's the question:

Is it possible to make a CQL call from within a Spark task? What driver should be used? Can it be set up only once an reused for subsequent tasks?

Any help would be greatly appreciated, thanks.

Upvotes: 1

Views: 1676

Answers (1)

David Semeria
David Semeria

Reputation: 542

For the sake of future reference, I will explain how I solved this.

I actually used a slightly different method to the one outlined above, one which does not involve calling Cassandra from inside Spark tasks.

I started off with ck_list, a list of distinct values for the first cluster key when pk = PK. The code is not shown here, but I actually downloaded this list directly from Cassandra in the Spark driver using CQL.

I then transform ck_list into a list of RDDS. Next we combine the RDDs (each one representing a Cassandra row slice) into one unified RDD (wide_row).

The cast on CassandraRDD is necessary because union returns type org.apache.spark.rdd.RDD

After running the job I was able to verify that the wide_row had x partitions where x is the size of ck_list. A useful side effect is that wide_row is partitioned by the first cluster key, which is also the key I want to reduce by. Hence even more shuffling is avoided.

I don't know if this is the best way to achieve what I wanted, but it certainly works.

val ck_list // list first cluster key values where pk = PK

val wide_row = ck_list.map( ck =>
  sc.cassandraTable(KS, TBL)
    .select("c1", "c2").where("pk = ? and ck1 = ?", PK, ck)
    .asInstanceOf[org.apache.spark.rdd.RDD] 
).reduce( (x, y) => x.union(y) )

Upvotes: 4

Related Questions