Reputation: 11
We have a program called cassandra-scan which uses spark-cassandra-connector to list all the values of the partition key in a very large table. The table has around 17 million Cassandra partitions, and each partition has an average of 200 rows. The Cassandra cluster housing this table runs DSE 5.1.8 on 6 nodes. The replication factor for the keyspace containing the table is 3.
Here are simplified definitions of the keyspace and table.
CREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true;
CREATE TABLE myspace.largetable (
id text,
itemOrder text,
...
PRIMARY KEY (id, itemOrder)
) WITH CLUSTERING ORDER BY (itemOrder ASC)
The statement used in cassandra-scan to list all the values of the partition key is as follows:
val res = sc.cassandraTable(keyspace, table).select("id").perPartitionLimit(1).repartition(320)
We use Apache Spark 2.3.1 and spark-cassandra-connector 2.3.2. The command used to launch cassandra-scan is as follows.
/path/to/spark/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class "CassandraScan" --jars /path/to/spark-cassandra-connector_2.11-2.3.2.jar --executor-memory 15g --master local[20] cassandra-scan.jar &
cassandra-scan runs correctly and takes around 19 hours.
We recently set up a new Cassandra cluster, again with 6 nodes (different from those used in the first cluster). This cluster runs DSE 6.8.16. All the data from the first table has been added to a table in the new cluster.
We updated the version of Apache Spark to 2.4.8, and spark-cassandra-connector to 2.4.2. We tested the program with no. of Spark partitions in the range 2000 to 200,000. We haven't been able to get cassandra-scan to run correctly. We see errors of the following form:
java.io.IOException: Exception during execution of SELECT "id" FROM "myspace"."largetable" WHERE token("id") > ? AND token("id") <= ? PER PARTITION LIMIT 1 ALLOW FILTERING: Not enough replicas available for query at consistency LOCAL_ONE (1 required but only 0 alive)
Some runs of cassandra-scan caused some of the Cassandra nodes to go down with messages such as the following in the Cassandra logs.
INFO [CoreThread-22] 2022-04-03 06:26:35,467 InboundHandshakeHandler.java:353 - Failed to properly handshake with peer /xxx.xxx.xxx.xxx:41231. Closing the channel.
java.lang.OutOfMemoryError: Direct buffer memory
WARN [Outbound-/xxx.xxx.xxx.xxx-large-message-writer] 2022-04-01 19:17:58,248 AbstractOutboundMessageHandler.java:80 - LARGE_MESSAGE with id 97 from /xxx.xxx.xxx.xxx to /xxx.xxx.xxx.xxx via (/xxx.xxx.xxx.xxx,/xxx.xxx.xxx.xxx:7000) error...
java.io.IOException: java.lang.RuntimeException: io.netty.channel.unix.Errors$NativeIoException: writeAddress(..) failed: Connection reset by peer
Any help with getting this to work is much appreciated. Thanks.
Upvotes: 0
Views: 1137
Reputation: 11
We used DataStax Bulk Loader to solve the problem.
dsbulk unload \
--connector.csv.url <path>/<to>/<outputDir> \
-h <host> \
-query "select distinct id from myspace.largetable"
dsbulk took around 3 hours to obtain 17.5 million values.
Upvotes: 1
Reputation: 16303
This error indicates that at least one node in the cluster is unavailable to serve requests:
Not enough replicas available for query at consistency LOCAL_ONE \
(1 required but only 0 alive)
You need to review the Cassandra logs to determine (1) which of the nodes was unresponsive/unavailable, and (2) why. Cheers!
Upvotes: 0