CᴴᴀZ
CᴴᴀZ

Reputation: 531

Databricks Spark Reference Applications: State maintenance using static variables in Spark Streaming

1.1.3.2 Cumulative Calculations section mentions usage of static variables for maintaining:

My Understanding: In Distributed Mode, the DStream: contentSizeDStream (an infinite series of RDDs) would be partitioned and distributed amongst multiple Workers for processing, and each Worker will have its own copy of static variables (runningCount...runningMax).

Q: How would static variables help in maintaining an SSOT state?

Code snippet:

JavaDStream<Long> contentSizeDStream =
accessLogDStream.map(ApacheAccessLog::getContentSize).cache();
contentSizeDStream.foreachRDD(rdd -> {
if (rdd.count() > 0) {
runningSum.getAndAdd(rdd.reduce(SUM_REDUCER));
runningCount.getAndAdd(rdd.count());
runningMin.set(Math.min(runningMin.get(), rdd.min(Comparator.naturalOrder())));
runningMax.set(Math.max(runningMax.get(), rdd.max(Comparator.naturalOrder())));
System.out.print("Content Size Avg: " + runningSum.get() / runningCount.get());
System.out.print(", Min: " + runningMin.get());
System.out.println(", Max: " + runningMax.get());
}
return null;
});

Upvotes: 1

Views: 86

Answers (1)

maasg
maasg

Reputation: 37435

While the DStream represents a distributed stream data, the operations reveal a multi-level abstraction: Operations that apply to elements, such as map, filter, and their classical friends all operate distributed in the cluster. In the code above, accessLogDStream.map(ApacheAccessLog::getContentSize) is an example of that. contentSize will apply distributed over the elements of the accessLogDStream.

Then, we have operations that apply to RDDs, such as transform and foreachRDD. They operate on the underlying RDD abstraction. The code in the closures of these operations gets attached to the SparkStreaming scheduler and execute on the driver, not distributed. Yet, as they provide access to an RDD, we can still apply distributed operations to the RDD and get the results to the single driver context.

Going back to the example code, runningCount is a variable local to the context of the driver. When we do runningCount.getAndAdd(rdd.count()); we are effectively applying a distributed count operation over the rdd, collecting the totalized count over all partitions and executors to the driver and then adding that number to the value of runningCount in the local context.

That way, the streaming program we can keep a centralized summary of the data distributed in the cluster, which can be further consumed by other applications that are interested in the results of the streaming job.

Upvotes: 1

Related Questions