Reputation: 1640
I want to combine 2 akka streams sources and retain ActorRef
of the first source for the actual usage after materialization
val buffer = 100
val apiSource: Source[Data, ActorRef] = Source.actorRef[Data](buffer, OverflowStrategy.backpressure)
.delay(2.second, DelayOverflowStrategy.backpressure)
val kafkaSource: Source[Data, Consumer.Control] = createConsumer(config.kafkaConsumerConfig, "test")
val combinedSource: Source[Data, NotUsed] = Source.combine(kafkaSource, apiSource)(Merge(_))
The problem is that combined
method ignores materialization types and I wonder if there is another way of achieving this
Upvotes: 0
Views: 77
Reputation: 1640
This seems to be working
def combineAndRetainFirst[T,M1, M2](first: Source[T, M1], second: Source[T, M2]): Source[T, M1] ={
Source.fromGraph(
GraphDSL.create(first, second)((m1, _) => m1){ implicit builder => (g1, g2) =>
import GraphDSL.Implicits._
val merge = builder.add(Merge[T](2))
g1 ~> merge.in(0)
g2 ~> merge.in(1)
SourceShape(merge.out)
}
)
}
Upvotes: 0
Reputation: 1806
You can use Source#mergeMat
:
val combinedSource: Source[Data, ActorRef] = kafkaSource.mergeMat(apiSource)(Keep.right)
Upvotes: 1