user1052610
user1052610

Reputation: 4719

Caching in Spark

A function is defined to transform an RDD. Therefore, the function is called once for each element in the RDD.

The function needs to call an external web service to look up reference data, passing as a parameter data from the current element in the RDD.

Two questions:

  1. Is there an issue with issuing a web service call within Spark?

  2. The data from the web service needs to be cached. What is the best way to hold (and subsequently reference) the cached data? The simple way would be to hold the cache in a collection with the Scala class which contains the function being passed to the RDD. Would this be efficient, or is there a better approach for caching in Spark?

Thanks

Upvotes: 1

Views: 3079

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37852

There isn't really any mechanism for "caching" (in the sense that you mean). Seems like the best approach would be to split this task into two phases:

  1. Get the distinct "keys" by which you must access the external lookup, and perform the lookup once for each key
  2. Use this mapping to perform the lookup for each record in the RDD

I'm assuming there would potentially be many records accessing the same lookup key (otherwise "caching" won't be of any value anyway), so performing the external calls for the distinct keys is substantially faster.

How should you implement this?

  • If you know this set of distinct keys is small enough to fit into your driver machine's memory:

    • map your data into the distinct keys by which you'd want to cache these fetched values, and collect it, e.g. : val keys = inputRdd.map(/* get key */).distinct().collect()
    • perform the fetching on driver-side (not using Spark)
    • use the resulting Map[Key, FetchedValues] in any transformation on your original RDD - it will be serialized and sent to each worker where you can perform the lookup. For example, assuming the input has records for which the foreignId field is the lookup key:

      val keys = inputRdd.map(record => record.foreignId).distinct().collect()
      val lookupTable = keys.map(k => (k, fetchValue(k))).asMap
      val withValues = inputRdd.map(record => (record, lookupTable(record.foreignId)))
      
    • Alternatively - if this map is large (but still can fit in driver memory), you can broadcast it before you use it in RDD transformation - see Broadcast Variables in Spark's Programming Guide

  • Otherwise (if this map might be too large) - you'll need to use join if you want keep data in the cluster, but to still refrain from fetching the same element twice:

    val byKeyRdd = inputRdd.keyBy(record => record.foreignId)
    val lookupTableRdd = byKeyRdd
      .keys()
      .distinct()
      .map(k => (k, fetchValue(k))) // this time fetchValue is done in cluster - concurrently for different values
    val withValues = byKeyRdd.join(lookupTableRdd)
    

Upvotes: 3

Related Questions