Reputation: 35
I'm considering using Apache Spark streaming for some real-time work but I'm not sure how to cache a dataset for use in a join/lookup.
The main input will be json records coming from Kafka that contain an Id, I want to translate that id into a name using a lookup dataset. The lookup dataset resides in Mongo Db but I want to be able to cache it inside the spark process as the dataset changes very rarely (once every couple of hours) so I don't want to hit mongo for every input record or reload all the records in every spark batch but I need to be able to update the data held in spark periodically (e.g. every 2 hours).
What is the best way to do this?
Thanks.
Upvotes: 1
Views: 4082
Reputation: 1137
Here is a fairly simple work-flow:
For each batch of data:
Upvotes: 0
Reputation: 31515
I've thought long and hard about this myself. In particular I've wondered is it possible to actually implement a database DB in Spark of sorts.
Well the answer is kind of yes. First you want a program that first caches the main data set into memory, then every couple of hours does an optimized join-with-tiny to update the main data set. Now apparently Spark will have a method that does a join-with-tiny (maybe it's already out in 1.0.0 - my stack is stuck on 0.9.0 until CDH 5.1.0 is out).
Anyway, you can manually implement a join-with-tiny, by taking the periodic bi-hourly dataset and turning it into a HashMap then broadcasting it as a broadcast variable. What this means is that the HashMap will be copied, but only once per node (compare this with just referencing the Map - it would be copied once per task - a much greater cost). Then you take your main dataset and add on the new records using the broadcasted map. You can then periodically (nightly) save to hdfs or something.
So here is some scruffy pseudo code to elucidate:
var mainDataSet: RDD[KeyType, DataType] = sc.textFile("/path/to/main/dataset")
.map(parseJsonAndGetTheKey).cache()
everyTwoHoursDo {
val newData: Map[KeyType, DataType] = sc.textFile("/path/to/last/two/hours")
.map(parseJsonAndGetTheKey).toarray().toMap
broadcast(newData)
val mainDataSetNew =
mainDataSet.map((key, oldValue) => (key,
newData.get(key).map(newDataValue =>
update(oldValue, newDataValue))
.getOrElse(oldValue)))
.cache()
mainDataSetNew.someAction() // to force execution
mainDataSet.unpersist()
mainDataSet = mainDataSetNew
}
I've also thought that you could be very clever and use a custom partioner with your own custom index, and then use a custom way of updating the partitions so that each partition itself holds a submap. Then you can skip updating partitions that you know won't hold any keys that occur in the newData, and also optimize the updating process.
I personally think this is a really cool idea, and the nice thing is your dataset is already ready in memory for some analysis / machine learning. The down side is your kinda reinventing the wheel a bit. It might be a better idea to look at using Cassandra as Datastax is partnering with Databricks (people who make Spark) and might end up supporting some kind of thing like this out of box.
Further reading:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
Upvotes: 3