erip
erip

Reputation: 16985

How can I merge an arbitrary number of sources in Akka stream?

I have n sources that I'd like to merge by priority in Akka streams. I'm basing my implementation on the GraphMergePrioritiziedSpec, in which three prioritized sources are merged. I attempted to abstract away the number of Sources with the following:

import akka.NotUsed
import akka.stream.{ClosedShape, Graph, Materializer}
import akka.stream.scaladsl.{GraphDSL, MergePrioritized, RunnableGraph, Sink, Source}
import org.apache.activemq.ActiveMQConnectionFactory

class SourceMerger(
  sources: Seq[Source[java.io.Serializable, NotUsed]],
  priorities: Seq[Int],
  private val sink: Sink[java.io.Serializable, _]
) {

  require(sources.size == priorities.size, "Each source should have a priority")

  import GraphDSL.Implicits._

  private def partial(
    sources: Seq[Source[java.io.Serializable, NotUsed]],
    priorities: Seq[Int],
    sink: Sink[java.io.Serializable, _]
  ): Graph[ClosedShape, NotUsed] = GraphDSL.create() { implicit b =>

      val merge = b.add(MergePrioritized[java.io.Serializable](priorities))

      sources.zipWithIndex.foreach { case (s, i) =>
        s.shape.out ~> merge.in(i)
      }

      merge.out ~> sink
      ClosedShape
  }

  def merge(
    sources: Seq[Source[java.io.Serializable, NotUsed]],
    priorities: Seq[Int],
    sink: Sink[java.io.Serializable, _]
  ): RunnableGraph[NotUsed] = RunnableGraph.fromGraph(partial(sources, priorities, sink))

  def run()(implicit mat: Materializer): NotUsed = merge(sources, priorities, sink).run()(mat)
}

However, I get an error when running the following stub:

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{Matchers, WordSpecLike}
import akka.testkit.TestKit

import scala.collection.immutable.Iterable

class SourceMergerSpec extends TestKit(ActorSystem("SourceMerger")) with WordSpecLike with Matchers {

  implicit val materializer: Materializer = ActorMaterializer()

  "A SourceMerger" should {
    "merge by priority" in {

      val priorities: Seq[Int] = Seq(1,2,3)

      val highPriority = Iterable("message1", "message2", "message3")
      val mediumPriority = Iterable("message4", "message5", "message6")
      val lowPriority = Iterable("message7", "message8", "message9")

      val source1 = Source[String](highPriority)
      val source2 = Source[String](mediumPriority)
      val source3 = Source[String](lowPriority)

      val sources = Seq(source1, source2, source3)

      val subscriber = Sink.seq[java.io.Serializable]

      val merger = new SourceMerger(sources, priorities, subscriber)

      merger.run()

      source1.runWith(Sink.foreach(println))
    }
  }

}

The relevant stacktrace is here:

[StatefulMapConcat.out] is already connected
java.lang.IllegalArgumentException: [StatefulMapConcat.out] is already connected
    at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1304)
    at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase$class.$tilde$greater(Graph.scala:1431)
    at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1521)
    at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:26)
    at SourceMerger$$anonfun$partial$1$$anonfun$apply$1.apply(SourceMerger.scala:25)

It seems that the error comes from this:

sources.zipWithIndex.foreach { case (s, i) =>
  s.shape.out ~> merge.in(i)
}

Is it possible to merge an arbitrary number of Sources in Akka streams Graph DSL? If so, why isn't my attempt successful?

Upvotes: 0

Views: 1755

Answers (2)

Astrid
Astrid

Reputation: 1828

Your code runs without the error if I replace

  sources.zipWithIndex.foreach { case (s, i) =>
    s.shape.out ~> merge.in(i)
  }

with

  sources.zipWithIndex.foreach { case (s, i) =>
    s ~> merge.in(i)
  }

I admit I'm not quite sure why! At any rate, s.shape is a StatefulMapConcat and that's the point where it's complaining about the out port already being connected. The problem occurs even if you only pass a single source, so the arbitrary number isn't the problem.

Upvotes: 1

Primary Problem with Code Example

One big issue with the code snippet provided in the question is that source1 is connected to the Sink from the merge call and the Sink.foreach(println). The same Source cannot be connected to multiple Sinks without an intermediate fan-out element.

Removing the Sink.foreach(println) may solve your problem outright.

Simplified Design

The merging can be simplified based on the fact that all messages from a particular Source have the same priority. This means that you can sort the sources by their respective priority and then concatenate them all together:

private def partial(sources: Seq[Source[java.io.Serializable, NotUsed]],
                    priorities: Seq[Int],
                    sink: Sink[java.io.Serializable, _]): RunnableGraph[NotUsed] = 
   sources.zip(priorities)
          .sortWith(_._2 < _._2)
          .map(_._1)
          .reduceOption(_ ++ _)
          .getOrElse(Source.empty[java.io.Serializable])
          .to(sink)

Upvotes: 1

Related Questions