Reputation: 1103
I am reading kafka streaming messages using spark-streaming.
Now I want to set Cassandra as my output.
I have created a table in cassandra "test_table" with columns "key:text primary key" and "value:text"
I have mapped the data successfully into JavaDStream<Tuple2<String,String>> data
like this:
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >()
{
public Tuple2<String,String> call(Tuple2<String, String> message)
{
return new Tuple2<String,String>( message._1(), message._2() );
}
}
);
Then I have created a List:
List<TestTable> list = new ArrayList<TestTable>();
where TestTable is my custom class having the same structure as my Cassandra table, with members "key" and "value":
class TestTable
{
String key;
String val;
public TestTable() {}
public TestTable(String k, String v)
{
key=k;
val=v;
}
public String getKey(){
return key;
}
public void setKey(String k){
key=k;
}
public String getVal(){
return val;
}
public void setVal(String v){
val=v;
}
public String toString(){
return "Key:"+key+",Val:"+val;
}
}
Please suggest a way how to I add the data from JavaDStream<Tuple2<String,String>> data
into the List<TestTable> list
.
I am doing this so that I can subsequently use
JavaRDD<TestTable> rdd = sc.parallelize(list);
javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table");
to save the RDD data into Cassandra.
I had tried coding this way:
messages.foreachRDD(new Function<Tuple2<String,String>, String>()
{
public List<TestTable> call(Tuple2<String,String> message)
{
String k = message._1();
String v = message._2();
TestTable tbl = new TestTable(k,v);
list.put(tbl);
}
}
);
but seems some type mis-match happenning. Please help.
Upvotes: 3
Views: 2323
Reputation: 37435
Assuming that the intention of this program is to save the streaming data from kafka into Cassandra, it's not necessary to dump the JavaDStream<Tuple2<String,String>>
data into a List<TestTable>
list.
The Spark-Cassandra connector by DataStax supports this functionality directly through the Spark Streaming extensions.
It should be sufficient to use such extensions on the JavaDStream
:
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
instead of draining data on an intermediary list.
Upvotes: 6