M80
M80

Reputation: 994

Perisistent in-memory database in Apache Spark

I have a custom foreach writer for Spark streaming. For each row I write to JDBC source. I also want to do somekind of fast lookup before I perform a JDBC operation and update the value after I perform JDBC operations, like "Step-1" and "Step-3" in below sample code ...

I don't want to use external databases like REDIS, MongoDB. I want something with low foot print like RocksDB, Derby, etc ...

I'm okay with storing one-file per application, just like checkpointing , I'll create a internal-db folder ...

I could not see any in-memory DB for Spark ..

def main(args: Array[String]): Unit = {

val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();

val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;

val kafkaDataframe = sparkSession.readStream.format("kafka")
  .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
  .load()

kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat ( topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")

val customForEachWriter = new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long) = {
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
    true
  }

  override def process(value: Row) = {
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store

    // JDBC operations

    // Step 3 ==> Update the value in persistent KEY-VALUE store
  }

  override def close(errorOrNull: Throwable) = {
    println(" ************** Closed ****************** ")
  }
}

val yy = sqlDataframe
  .writeStream
  .queryName("foreachquery")
  .foreach(customForEachWriter)
  .start()

yy.awaitTermination()

sparkSession.close();

}

Upvotes: 1

Views: 1708

Answers (1)

user1483833
user1483833

Reputation: 71

Manjesh,

What you are looking for, "Spark and your in-memory DB as one seamless cluster, sharing a single process space", with support for MVCC is exactly what SnappyData provides. With SnappyData, the tables that you want to do a fast lookup on are in the same process that is running your Spark streaming job. Check it out here

SnappyData has a Apache V2 license for the core product and the specific use that you are referring to is available in the OSS download.

(Disclosure: I am a SnappyData employee and it makes sense to provide a product specific answer to this question because the product is the answer to the question)

Upvotes: 2

Related Questions