Naresh
Naresh

Reputation: 5397

Is it feasible to save data from SparkStreaming to Cassandra from Spark Workers

SparkStreamingContext is created on driver and it can't be serialized. So, we cannot access this object on workers. Now to make use of distributed nature of Spark we would be processing the data/stream on workers. Hence, it make sense to save data directly from workers rather than collecting all the data to driver which might give OOM.

I have the scenario like this where i need to apply some business logic and finally need to store the data to cassandra. So, how do i do it?

DStream.foreachRDD{ x => x.foreachPartition {  some logic and finally Store the data to cassandra }  } 

Upvotes: 1

Views: 386

Answers (2)

Sreekar
Sreekar

Reputation: 1025

val sparkConf = new SparkConf().setAppName("Test App")
sparkConf.set("spark.cassandra.connection.host", "X.X.X.X")
sparkConf.set("spark.cassandra.auth.username", "xxxxx")            
sparkConf.set("spark.cassandra.auth.password", "xxxxx")

SparkConf is the object in the context which holds your Cassandra connection configuration, so set it there.

SparkContext is part of StreamingContext, but you don't really have to worry about it. If you need to access SparkContext or current SparkConf then you can do that like this

StreamingContext ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.sparkContext   => Your Spark Context object
ssc.sparkContext.getConf   => Your SparkConf object

Coming to how to save to Cassandra part, it can be done like this:

// messages is the InputDStream in this example
messages.foreachRDD( x => { 
    // Write business logic
    x.saveToCassandra("keyspace_name", "table_name")
}

Upvotes: 1

abaghel
abaghel

Reputation: 15307

You can use datastax's spark-cassandra-connector (https://github.com/datastax/spark-cassandra-connector) which allows to store JavaDStream directly into Cassandra DB.

import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions;

Map<String, String> columnNameMappings;
JavaDStream<MyTableData> myDStream;
javaFunctions(myDStream).writerBuilder("mykeyspace", "my_table",
            CassandraJavaUtil.mapToRow(MyTableData.class, columnNameMappings)).saveToCassandra();

Upvotes: 3

Related Questions