Reputation: 180
Can someone please explain me how Spark Streaming executes the window() operation? From the Spark 1.6.1 documentation, it seems that windowed batches are automatically cached in memory, but looking at the web UI it seems that operations already executed in previous batches are executed again. For your convenience, I attach a screenshot of my running application below:
By looking at the web UI, it seems that flatMapValues() RDDs are cached (green spot - this is the last operation executed before I call window() on the DStream), but, at the same time, it also seems that all the transformations that led to flatMapValues() in previous batches are executed again. If this is the case, the window() operation may induce huge performance penalties, especially if the window duration is 1 or 2 hours (as I expect for my application). Do you think that checkpointing the DStream at that time can be helpful? Consider that the expected slide window is about 5 minutes.
Hope someone can clarify this point.
I add a code snippet. Stream1 and Stream2 are data feeds read from HDFS
JavaPairDStream<String, String> stream1 = cdr_orig.mapToPair(parserFunc)
.flatMapValues(new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split(","));
}
}).window(Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION));
JavaPairDStream<String, String> join = stream2.join(stream1);
The two streams are produced periodically by another system. These streams are asynchronous, which means that records in stream2 at time t appear in stream1 at time t'<=t. I'm using window() to cache stream1 records for 1-2 hours, but this can be inefficient if the operations on past batches of stream1 will be executed at every new batch.
Upvotes: 3
Views: 1481
Reputation: 2745
First of all yes window() do cache the dStream by calling persist on it. By caching here I mean data is kept in memory. Default storage level is StorageLevel.MEMORY_ONLY_SER i.e.
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
What window transformation does is it returns a new DStream in which each RDD contains all the elements seen in a sliding window of time over this DStream. Internally, it creates a WindowedDStream object which calls persist() to cache DStream and as per Spark API docs "it persists at parent level by default, as those RDDs are going to be obviously reused."
So, you can not rely on Window for caching of DStreams. If you want to cut down transformations you should call persist() on that DStreams before and other transformation.
In your case try calling persist as shown:
cdr_orig.persist(StorageLevel.MEMORY_AND_DISK);
before doing a mapToPair transform. You will see a more compact DAG will be formed with green dot at top.
Upvotes: 3