Reputation: 65
I'm trying out Akka Stream API and I have no idea why this throws java.lang.IllegalArgumentException: [Partition.in] is already connected in line 5
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val intSource = Source.fromIterator(() => Iterator.continually(Random.nextInt(10).toString))
val validateInput: Flow[String, Message, NotUsed] = Flow[String].map(Message.fromString)
val validationPartitioner = Partition[Message](2, { // #5 error here
case _: Data => 0
case _ => 1
})
val outputStream = Sink.foreach[Message](println(_))
val errorStream = Sink.ignore
intSource ~> validateInput ~> validationPartitioner.in
validationPartitioner.out(0) ~> outputStream
validationPartitioner.out(1) ~> errorStream
ClosedShape
})
but if I change validationPartitioner to be wrapped in builder.add(...) and remove .in from
intSource ~> validateInput ~> validationPartitioner.in
Everything works. If I just remove .in the code doesn't compile. Why usage of builder is being forced and am I missing something or is it a bug?
Upvotes: 1
Views: 712
Reputation: 19507
All of the components of a graph must be added to the builder, but there are variants of the ~>
operator that add the most commonly used components, such as Source
and Flow
, to the builder under the covers (see here and here). However, junction operations that perform a fan-in (such as Merge
) or a fan-out (such as Partition
) must be explicitly passed to builder.add
if you're using the Graph DSL.
Upvotes: 2