Reputation: 499
I'm trying to make writes through Spark. I have 6 nodes in my cluster and in it I made keyspace in which I want to write data:
CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} AND durable_writes = true;
When I'm trying to write from Spark, I'm getting this kind of error:
16/08/17 16:14:57 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@7409fd2d
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency ONE (1 required but only 0 alive)
This is snippet of code what am I doing exactly:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType, IntegerType};
object ff {
def main(string: Array[String]) {
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "127.0.0.1")
.set("spark.cassandra.connection.host","ONE")
.setMaster("local[4]")
.setAppName("ff")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true")
.load("test.csv")
df.registerTempTable("ff_table")
//df.printSchema()
df.count
time {
df.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
.save()
}
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
ret
}
}
}
Also, if I run nodetool describecluster
I got this results:
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
I tried to insert in CLI on row for replication_factor
:2 and it's working, so every node can see each other.
Why Spark can't insert anything than, why nodes can't see each other while trying to insert data from Spark, anyone idea?
Upvotes: 1
Views: 460
Reputation: 16576
It looks like you are running 6 nodes on one machine via loopback. This means there is a rather likely chance that the resources of this machine are being over subscribed. The various Cassandra instances are most likely taking turns or swapping which is causing them to go missing when under heavy load. Increasing the replication factor increases the chance that a valid target is up but will increase load even further.
C* requires at it's core several different resources from your system, if any of these become a bottleneck any one these there is a chance that a node will not respond to gossip in sufficent time.
These resources are RAM - How much memory the JVM is able to acquire, this is affected by OS swap as well. This means if you allocate a large JVM but the OS swaps it to disk, you are likely to see massive performance issues. With multiple nodes on the same machine you need to make sure there is ample ram for the JVM of every node you are startin. In addition if any one instance's JVM is getting to close to full you will enter GC and possibly a GC Storm which will basically lock up that instance. Many of these details will be clear in the system.log.
CPU - Without exclusive access to at least one cpu you are almost guaranteed to have some important threads in C* scheduled with a long delay between them. This can cause gossip threads to be ignored and gossip to fail. This will give some nodes a view of a cluster with failed machines and cause unavailable errors.
DISK - Every Cassandra instance will maintain it's own CommitLog and HD files. The commit log flushes every 10 seconds and if you have multiple instances and only 1 harddrive the flushes between the commitlog and normal memtables can easily block one another. This is further compounded by compaction which requires another large amount of IO.
NETWORK - Although this isn't an issue with multiple nodes on the same machine.
In summation, It is important to make sure the resources allocated to your C* instances are small enough that no instance will overrun the space/ram/cpu of another. If you do so you will end up with a cluster whose communication fails when under load because one of the above resources is bottlenecked. This doesn't mean it's impossible to run multiple nodes on the same machine but does mean you must take care in provisioning. You can also attempt to lessen the load by throttling your write speed which will give the nodes less of a chance of clobbering one another.
Upvotes: 2