Reputation: 994
I have a custom foreach writer for Spark streaming. For each row I write to JDBC source. I also want to do somekind of fast lookup before I perform a JDBC operation and update the value after I perform JDBC operations, like "Step-1" and "Step-3" in below sample code ...
I don't want to use external databases like REDIS, MongoDB. I want something with low foot print like RocksDB, Derby, etc ...
I'm okay with storing one-file per application, just like checkpointing , I'll create a internal-db folder ...
I could not see any in-memory DB for Spark ..
def main(args: Array[String]): Unit = {
val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"
val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();
val sparkContext = sparkSession.sparkContext;
val sqlContext = sparkSession.sqlContext;
val kafkaDataframe = sparkSession.readStream.format("kafka")
.options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
"startingOffsets" -> "latest", "" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
val sqlDataframe = sqlContext.sql("select concat ( topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")
val customForEachWriter = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = {
println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
override def process(value: Row) = {
// Step 1 ==> Lookup a key in persistent KEY-VALUE store
// JDBC operations
// Step 3 ==> Update the value in persistent KEY-VALUE store
override def close(errorOrNull: Throwable) = {
println(" ************** Closed ****************** ")
val yy = sqlDataframe
Upvotes: 1
Views: 1708
Reputation: 71
What you are looking for, "Spark and your in-memory DB as one seamless cluster, sharing a single process space", with support for MVCC is exactly what SnappyData provides. With SnappyData, the tables that you want to do a fast lookup on are in the same process that is running your Spark streaming job. Check it out here
SnappyData has a Apache V2 license for the core product and the specific use that you are referring to is available in the OSS download.
(Disclosure: I am a SnappyData employee and it makes sense to provide a product specific answer to this question because the product is the answer to the question)
Upvotes: 2