Reputation: 1687
Is it possible to cache a source and reuse it without triggering "Substream cannot be materialized more than once"?
I'm doing a stream join which requires a call out to a microservice for each element on the left. That call returns a stream of records to join to. I want to cache the source so that the same calls to the microservice result in the cached stream. But the flatMapConcat I'm doing throws the "Substream cannot be materialized more than once" error. The code looks like this:
val cache = new util.HashMap[AnyRef, Source[Array[AnyRef], Any]]()
inputSource
.flatMapConcat { record =>
val key = leftKey(record)
val rightElemSource = if (cache.containsKey(key)) {
cache.get(key)
} else {
val rightElemSourceInner = doSomethingToGetSource()
cache.put(key, rightElemSourceInner)
rightElemSourceInner
}
rightElemSource.map(join(record, _))
}
Upvotes: 1
Views: 480
Reputation: 4017
Source
represents a potentially huge or even infinite stream of data. It's designed to be traversed only once like Iterator
. If you really want the content of the source to be reused you have to collect it into a regular data structure, say, Seq
.
So your cache would be of type util.HashMap[AnyRef, Seq[Array[AnyRef], Any]]
.
Upvotes: 2