user1052610
user1052610

Reputation: 4719

How to define a global read\write variables in Spark

Spark has broadcast variables, which are read only, and accumulator variables, which can be updates by the nodes, but not read. Is there way - or a workaround - to define a variable which is both updatable and can be read?

One requirement for such a read\write global variable would be to implement a cache. As files are loaded and processed as rdd's, calculations are performed. The results of these calculations - happening in several nodes running in parallel - need to be placed into a map, which has as it's key some of the attributes of the entity being processed. As subsequent entities within the rdd's are processed, the cache is queried.

Scala does have ScalaCache, which is a facade for cache implementations such as Google Guava. But how would such a cache be included and accessed within a Spark application?

The cache could be defined as a variable in the driver application which creates the SparkContext. But then there would be two issues:

What is the best way to implement and store such a cache?

Thanks

Upvotes: 12

Views: 13099

Answers (1)

zero323
zero323

Reputation: 330163

Well, the best way of doing this is not doing it at all. In general Spark processing model doesn't provide any guarantees* regarding

  • where,
  • when,
  • in what order (excluding of course the order of transformations defined by the lineage / DAG)
  • and how many times

given piece of code is executed. Moreover, any updates which depend directly on the Spark architecture, are not granular.

These are the properties which make Spark scalable and resilient but at the same this is the thing that makes keeping shared mutable state very hard to implement and most of the time completely useless.

If all you want is a simple cache then you have multiple options:

  • use one of the methods described by Tzach Zohar in Caching in Spark
  • use local caching (per JVM or executor thread) combined with application specific partitioning to keep things local
  • for communication with external systems use node local cache independent of Spark (for example Nginx proxy for http requests)

If application requires much more complex communication you may try different message passing tools to keep synchronized state but in general it requires a complex and potentially fragile code.


* This partially changed in Spark 2.4, with introduction of the barrier execution mode (SPARK-24795, SPARK-24822).

Upvotes: 11

Related Questions