Noah
Noah

Reputation: 1687

Cache an Akka Substream source and reuse it?

Is it possible to cache a source and reuse it without triggering "Substream cannot be materialized more than once"?

I'm doing a stream join which requires a call out to a microservice for each element on the left. That call returns a stream of records to join to. I want to cache the source so that the same calls to the microservice result in the cached stream. But the flatMapConcat I'm doing throws the "Substream cannot be materialized more than once" error. The code looks like this:

    val cache = new util.HashMap[AnyRef, Source[Array[AnyRef], Any]]()

    inputSource
        .flatMapConcat { record =>
          val key = leftKey(record)
          val rightElemSource = if (cache.containsKey(key)) {
            cache.get(key)
          } else {
            val rightElemSourceInner = doSomethingToGetSource()
            cache.put(key, rightElemSourceInner)
            rightElemSourceInner
          }

          rightElemSource.map(join(record, _))
        }

Upvotes: 1

Views: 480

Answers (1)

simpadjo
simpadjo

Reputation: 4017

Source represents a potentially huge or even infinite stream of data. It's designed to be traversed only once like Iterator. If you really want the content of the source to be reused you have to collect it into a regular data structure, say, Seq. So your cache would be of type util.HashMap[AnyRef, Seq[Array[AnyRef], Any]].

Upvotes: 2

Related Questions