Reputation: 1
Goal: Read kafka with spark streaming and store data in cassandra By: Java Spark cassandra connector 1.6 Data input: simple json line object {"id":"1","field1":"value1}
i´ve a java class to read from kafka by spark streaming, processing the data read and then store it in cassandra.
here is the main code:
**JavaPairReceiverInputDStream**<String, String> messages =
KafkaUtils.createStream(ssc,
targetKafkaServerPort, targetTopic, topicMap);
**JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
public List<Object> call( Tuple2<String,String> tuple2){
List<Object> **list**=new ArrayList<Object>();
Gson gson = new Gson();
MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
myclass.setNewData("new_data");
String jsonInString = gson.toJson(myclass);
list.add(jsonInString);
return list;
}
});
The next code is incorrect:
**javaFunctions**(list)
.writerBuilder("schema", "table", mapToRow(JavaDStream.class))
.saveToCassandra();
Because "javaFunctions" method expect a JavaRDD object and "list" is a JavaDStream...
I´d need to cast JavaDStream to JavaRDD but I don´t find the right way...
Any help?
Upvotes: 0
Views: 662
Reputation: 1
ummmm not really...What I´ve done is use a foreachRDD after create the dsStream:
dStream.foreachRDD(new Function<JavaRDD<MyObject>, Void>() {
@Override
public Void call(JavaRDD<MyObject> rdd) throws Exception {
if (rdd != null) {
javaFunctions(rdd)
.writerBuilder("schema", "table", mapToRow(MyObject.class))
.saveToCassandra();
logging(" --> Saved data to cassandra",1,null);
}
return null;
}
});
Hope to be usefull...
Upvotes: 0
Reputation: 21
Let's use import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.* instead of com.datastax.spark.connector.japi.CassandraJavaUtil.*
Upvotes: 1