ktzan
ktzan

Reputation: 522

Spark-Cassandra Connector -- Spark and Cassandra partitions -- data locality

I have a 16 node cluster where every node has Spark and Cassandra installed while I am using the Spark-Cassandra Connector 3.0.0. The spark cluster has 16 executors with 2 cores each, total of 32 cores. I have ~2.2 billion rows(also primary keys) in Cassandra database with 4.827 unique partition keys in total. I am using dataframes/datasets and the code is in Java while I also use .config("spark.sql.shuffle.partitions",96) in spark configuration. In the code I select all 2.2 billion rows and join on the partition key.

  1. In the Spark GUI I see there is a broadcast with 32 tasks which means that Sparks Join is used and 32 tasks are because of the available cores. Does this mean that there will be 32 Spark partitions created initially where these 2.2 billion rows will reside?

  2. Should I definitely use .repartitionByCassandraReplica before I use Join? I am not convinced that it is needed, but the truth is that if I try to use it I get an error that "the symbol cannot be found". Also, DirectJoin is activated when I have less than 2600 partition keys.

My aim is to take advantage of data locality and avoid data transfer.

EDIT 1

For question 1, I went through the link you sent and as you say the size is based on whatever is in the system.size_estimates table.

For question 2, my Cassandra table is:

 CREATE TABLE experiment(
 experimentid varchar,
 description text,
 rt float,
 intensity float,
 mz float,
 identifier text,
 chemical_formula text,
 filename text,
 PRIMARY KEY ((experimentid),description, rt, intensity, mz, identifier, chemical_formula, filename));

and the spark code is:

Dataset<Row> dfexplist = sp.createDataset(experimentlist, Encoders.STRING()).toDF("experimentid");

Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "mdb");
                        put("table", "experiment");
                    }
                })
                .load().select(col("experimentid"), col("description"), col("intensity")).join(dfexplist, "experimentid").repartition(col("experimentid"));

Does this achieves data locality? Is there a shuffle happening when or before I join? At the end I am repartition according to the partition key so to avoid any future shuffle in later calculations.

Upvotes: 1

Views: 692

Answers (1)

Erick Ramirez
Erick Ramirez

Reputation: 16313

For question 1, the Spark partitions don't have a direct correlation with the number of cores or tasks. Spark partitions are calculated by the connector using the estimated table size (from the Cassandra system.size_estimates table) and the input split size. The formula is:

spark_partitions = estimated_table_size / input.split.size_in_mb

If you'd like to know the details, I've explained it in https://community.datastax.com/questions/11500/.

For question 2, it is definitely a good idea to use the repartitionByCassandraReplica() method to take advantage of data locality and minimise shuffling. However, I'm not sure why you're getting that error. If you update your original question with minimal code + data that replicates the issue, I'd be happy to review it and update my answer accordingly. Cheers!

Upvotes: 1

Related Questions