linehrr
linehrr

Reputation: 1748

is spark kafka-stream-reader caching data

I found this is a good question to ask, I might be able to find answer in the spark-kafka-streaming source code, I will do that if no one could answer this.
imagine scenario like this:

val dstream = ...
dstream.foreachRDD(
    rdd=> 
        rdd.count()
        rdd.collect()
)

in the example code above, as we can see we are getting micro-batches from dstream and for each batch we are triggering 2 actions.

  1. count() how many rows
  2. collect() all the rows

according to Spark's lazy eval behaviour, both actions will trace back to the origin of the data source(which is kafka topic), and also since we don't have any persist() or wide transformations, there is no way in our code logic that would make spark cache the data it have read from kafka.

so here is the question. Will spark read from kafka twice or just once? this is very perf related since reading from kafka involves netIO and potentially puts more pressure on the kafka brokers. so if spark-kafka-streaming lib won't cache it, we should definitely cache()/persist() it before multi-actions.

any discussions are welcome. thanks.

EDIT:
just found some docs on spark official website, looks like executor receivers are caching the data. but I don't know if this is for separate receivers only. because I read that spark kafka streaming lib doesn't use separate receivers, it receives data and process the data on the same core.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization

Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.

Upvotes: 1

Views: 362

Answers (2)

linehrr
linehrr

Reputation: 1748

according to official docs from Spark: http://spark.apache.org/docs/latest/streaming-programming-guide.html#data-serialization

Input data: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.

Upvotes: 1

user9323386
user9323386

Reputation: 1

There is no implicit caching when working with DStreams so unless you cache explicitly, every evaluation will hit Kafka brokers.

If you evaluate multiple times, and brokers are not co-located with Spark nodes, you should definitely consider caching.

Upvotes: 0

Related Questions