Reputation: 187
I am trying to use spark to process a large cassandra table (~402 million entries and 84 columns) but I am getting inconsistent results. Initially the requirement was to copy some columns from this table to another table. After copying the data, I noticed that some entries in the new table were missing. To verify that I took count of the large source table but I am getting different values each time. I tried the queries on a smaller table (~7 million records) and the results were fine.
Initially, I attempted to take count using pyspark. Here is my pyspark script:
spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
df.createOrReplaceTempView("data")
query = ("select count(1) from data " )
vgDF = spark.sql(query)
vgDF.show(10)
Spark submit command is as follows:
~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
The above spark submit process takes ~90 minutes to complete. I ran it three times and here are the counts I got:
Spark does not show any error or exception during the entire process. I ran the same query in cqlsh thrice and got different results again:
I am unable to find out why I am getting different outcomes from the same query. Cassandra system logs (/var/log/cassandra/system.log) has shown the following error message just once:
ERROR [SSTableBatchOpen:3] 2018-02-27 09:48:23,592 CassandraDaemon.java:226 - Exception in thread Thread[SSTableBatchOpen:3,5,main]
java.lang.AssertionError: Stats component is missing for sstable /media/db/datakeyspace/sensordata1-acfa7880acba11e782fd9bf3ae460699/mc-58617-big
at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:460) ~[apache-cassandra-3.9.jar:3.9]
at org.apache.cassandra.io.sstable.format.SSTableReader.open(SSTableReader.java:375) ~[apache-cassandra-3.9.jar:3.9]
at org.apache.cassandra.io.sstable.format.SSTableReader$4.run(SSTableReader.java:536) ~[apache-cassandra-3.9.jar:3.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Versions:
Cluster:
Any help will be greatly appreciated.
Upvotes: 3
Views: 2516
Reputation: 1407
Spark Cassandra connector default read consistency is "LOCAL_ONE" and default write consistency is "LOCAL_QUORUM", so it is possible to read partial data before full repair with that defaults. You can read "ONE" for the node that fail to write data, but that was not error because other 2 replicas success. So you should either set BOTH levels to QUORUM or one of them to ALL
config("spark.cassandra.input.consistency.level", "LOCAL_QUORUM").
config("spark.cassandra.output.consistency.level", "LOCAL_QUORUM").
The default CQL shell level is also ONE, so you should also increase it:
cqlsh> CONSISTENCY QUORUM
Upvotes: 6