Saravanan
Saravanan

Reputation: 127

Keeping data together in spark based on cassandra table partition key

When loading data from Cassandra table, a spark partition represents all rows with same partition key. However, when I create data in spark with same partition key and re-partitioning the new RDD using .repartitionByCassandraReplica(..) method, it ends up in a different spark partition? How do I achieve consistent partitions in spark using the partition-scheme defined by the Spark-Cassandra connector?

Links to download CQL and Spark job code that I tested

Version and other information

Code extract. Download code using above links for more details

Step 1 : Loads data into 8 spark partitions

Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
 .cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));

Step 2 : Repartition data into 8 partitions

.repartitionByCassandraReplica(
        "testkeyspace",
        "testtable",
        partitionNumPerHost,
        someColumns("id"),
        mapToRow(TestTable.class, map));

Step 3: Print partition id and values for both rdds

rdd.mapPartitionsWithIndex(...{
@Override
 public Iterator<String> call(..) throws Exception {
 List<String> list = new ArrayList<String>();
 list.add("PartitionId-" + integer);

 while (itr.hasNext()) {
    TestTable value = itr.next();
    list.add(Integer.toString(value.getId()));
 }
 return list.iterator();
}
}, true).collect();

Step 4 : Snapshot of results printed on Partition 1. Different for both Rdds but expect to be same

Load Rdd values

----------------------------
Table load - PartitionId -1
----------------------------
15
22

--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16

Upvotes: 1

Views: 1459

Answers (1)

RussS
RussS

Reputation: 16576

Repartition by Cassandra replica does not deterministically place keys. There is a ticket currently to change that.

https://datastax-oss.atlassian.net/projects/SPARKC/issues/SPARKC-278

A workaround now is to set the Partitionspernode parameter to 1.

Upvotes: 2

Related Questions