Reputation: 899
This is a subsequent post of Akka Stream - Select Sink based on Element in Flow.
Assume I have multiple SQS queues I'd like to stream from. I'm using the AWS SQS Connector of Alpakka to create Source
.
implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))
Now, I'd like to combine
the sources to merge them. However, the Source.combine method doesn't support passing a list as parameter, but only support varargs.
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])
Of course, I can finger type all sources parameters. But, the parameter will get pretty long if I have 10 source queues.
Is there a way to combine sources from a list of sources?
[Supplement]
As Ramon J Romero y Vigil pointed out, it's a better practice to keep stream "a thin veneer". In this particular case, however, I use single sqsClient
for all the SqsSource
initialization.
Upvotes: 1
Views: 1250
Reputation: 19517
You could use foldLeft
to concatenate or merge the sources:
val sources: List[Source[Message, NotUsed]] = ???
val concatenated: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ ++ _)
// the same as sources.foldLeft(Source.empty[Message])(_ concat _)
val merged: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ merge _)
Alternatively, you could use Source.zipN
with flatMapConcat
:
val combined: Source[Message, NotUsed] = Source.zipN(sources).flatMapConcat(Source.apply)
Upvotes: 3