stanislav.chetvertkov
stanislav.chetvertkov

Reputation: 1640

Retain materialization type when combining Sources

I want to combine 2 akka streams sources and retain ActorRef of the first source for the actual usage after materialization

val buffer = 100

val apiSource: Source[Data, ActorRef] = Source.actorRef[Data](buffer, OverflowStrategy.backpressure)
  .delay(2.second, DelayOverflowStrategy.backpressure)

val kafkaSource:   Source[Data, Consumer.Control] = createConsumer(config.kafkaConsumerConfig, "test")

val combinedSource: Source[Data, NotUsed] = Source.combine(kafkaSource, apiSource)(Merge(_))

The problem is that combined method ignores materialization types and I wonder if there is another way of achieving this

Upvotes: 0

Views: 77

Answers (2)

stanislav.chetvertkov
stanislav.chetvertkov

Reputation: 1640

This seems to be working

    def combineAndRetainFirst[T,M1, M2](first: Source[T, M1], second: Source[T, M2]): Source[T, M1] ={
    Source.fromGraph(
      GraphDSL.create(first, second)((m1, _) => m1){ implicit builder => (g1, g2) =>
        import GraphDSL.Implicits._

        val merge = builder.add(Merge[T](2))

        g1 ~> merge.in(0)
        g2 ~> merge.in(1)

        SourceShape(merge.out)
      }
    )
  }

Upvotes: 0

PH88
PH88

Reputation: 1806

You can use Source#mergeMat:

val combinedSource: Source[Data, ActorRef] = kafkaSource.mergeMat(apiSource)(Keep.right)

Upvotes: 1

Related Questions