Sigurd
Sigurd

Reputation: 107

Map materialized value from inner source in Akka stream

I have a Source<A,Mat1> outer and an operation Source<B, Mat2> buildInner(A a). I need a result of type Source<B,Mat2>.

I can do the following:

Source<B,Mat1> result = outer.flatMapConcat(a -> buildInner(a))

Then, I lose Mat2. I don't see an option to use inner.preMaterialize(actorSystem) to get a instance of Mat2 because the creation buildInner(a) depends on the elements of outer.

Is there any other operator, that allows that?

Upvotes: 0

Views: 40

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20561

In general, because the materialized value of a stream is constructed before a single element is ever seen/emitted by the stream, it can't depend on a value of the stream.

However, it's possible for the materialized value to be a placeholder for a value which will be computed later (e.g. when the stream sees an element). An example of such a placeholder in Java would be a CompletionStage wrapping a Mat2.

Note that since a useful materialized value will by definition be accessed by multiple threads (since the materialized value is the general means for the stream and the outside world to interact), this generally entails using some form of concurrency control (e.g. that provided by CompletionStage).

For example, using AtomicReference as our container to be filled later (and repeatedly, for each inner source):

Pair<AtomicReference<Mat2>, Source<A, NotUsed>> preMatPair = outer.preMaterialize(actorSystem).mapMaterializedValue(__ -> new AtomicReference<Mat2>());

Source<B, AtomicReference<Mat2>> result = preMatPair.second()
    .flatMapConcat(a ->
        buildInner().mapMaterializedValue(mat2 -> {
            preMatPair.first().set(mat2);
            return mat2;
        })
    ).mapMaterializedValue(__ -> preMatPair.first());

This sort of trick requires care in preMaterializeing outer so as not to accidentally share materialized values. Sticking this block in a method and not reusing the value (e.g. if restarting the stream) is a good convention for this purpose.

Upvotes: 0

Related Questions