vidya
vidya

Reputation: 11

How do you list all the partition keys in a large Cassandra table in Cassandra using Spark?

We have a program called cassandra-scan which uses spark-cassandra-connector to list all the values of the partition key in a very large table. The table has around 17 million Cassandra partitions, and each partition has an average of 200 rows. The Cassandra cluster housing this table runs DSE 5.1.8 on 6 nodes. The replication factor for the keyspace containing the table is 3.

Here are simplified definitions of the keyspace and table.

CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}  AND durable_writes = true;

CREATE TABLE myspace.largetable (
    id text,
    itemOrder text,
    ...
    PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)

The statement used in cassandra-scan to list all the values of the partition key is as follows:

val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)

We use Apache Spark 2.3.1 and spark-cassandra-connector 2.3.2. The command used to launch cassandra-scan is as follows.

/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &

cassandra-scan runs correctly and takes around 19 hours.

We recently set up a new Cassandra cluster, again with 6 nodes (different from those used in the first cluster). This cluster runs DSE 6.8.16. All the data from the first table has been added to a table in the new cluster.

We updated the version of Apache Spark to 2.4.8, and spark-cassandra-connector to 2.4.2. We tested the program with no. of Spark partitions in the range 2000 to 200,000. We haven't been able to get cassandra-scan to run correctly. We see errors of the following form:

java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ?  PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)

Some runs of cassandra-scan caused some of the Cassandra nodes to go down with messages such as the following in the Cassandra logs.

INFO  [CoreThread-22] 2022-04-03 06:26:35,467  InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN  [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248  AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer

Any help with getting this to work is much appreciated. Thanks.

Upvotes: 0

Views: 1137

Answers (2)

vidya
vidya

Reputation: 11

We used DataStax Bulk Loader to solve the problem.

dsbulk unload \
  --connector.csv.url <path>/<to>/<outputDir> \
  -h <host> \
  -query "select distinct id from myspace.largetable"

dsbulk took around 3 hours to obtain 17.5 million values.

Upvotes: 1

Erick Ramirez
Erick Ramirez

Reputation: 16303

This error indicates that at least one node in the cluster is unavailable to serve requests:

    Not enough replicas available for query at consistency LOCAL_ONE \
      (1 required but only 0 alive)

You need to review the Cassandra logs to determine (1) which of the nodes was unresponsive/unavailable, and (2) why. Cheers!

Upvotes: 0

Related Questions