Reputation:
I have an akka stream where I have an ADT of the form.
sealed trait Message
sealed trait ThisMessage extends Message
sealed trait ThatMessage extends Message
Now I have a This Message Handler Flow and a That Message Handler Flow. I have an inlet flow that accepts type Message.
In order to create a split, I have the following partitioner. I have the following definition for a partitioner function.
/**
* Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
*
* @tparam A type of input
* @tparam B type of output on the first outlet.
* @tparam C type of output on the second outlet.
*
* @return A partition stage
*/
def binaryPartitionByType[A, B <: A, C <: A](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
import GraphDSL.Implicits._
// This is wrong, but I have no idea how to write this.
val partitioner: UniformFanOutShape[A, A] = builder.add(Partition[A](2, {
case _: B => 0
case _: C => 1
}))
new FanOutShape2(partitioner.in, partitioner.out(0).outlet, partitioner.out(1).outlet)
}
I wish to use the above method, and use the ADT in the type params to initialize a partitioner.
The compiler throws this error.
Error:(63, 7) type mismatch;
found : akka.stream.FanOutShape2[A,A,A]
required: akka.stream.FanOutShape2[A,B,C]
new FanOutShape2(partitioner.in, partitioner.out(0).outlet,
partitioner.out(1).outlet)
From what I understand the partition object only has the Inlet (in this case A, a parametrized type.
Anyone have any ideas how I can fix this?
Upvotes: 1
Views: 601
Reputation: 22439
Here's one way to instantiate a FanOutShape2[A, B<:A, C<:A]
from UniformFanOutShape[A, A]
generated by builder.add(Partition[A]())
:
import akka.stream.scaladsl._
import akka.stream.{Graph, FanOutShape2}
import akka.NotUsed
import scala.reflect.ClassTag
def binaryPartitionByType[A, B <: A : ClassTag, C <: A : ClassTag](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[A](2, {
case _: B => 0
case _: C => 1
}))
val partitionB = builder.add(Flow[A].collect{ case b: B => b })
val partitionC = builder.add(Flow[A].collect{ case c: C => c })
partitioner.out(0) ~> partitionB
partitioner.out(1) ~> partitionC
new FanOutShape2(partitioner.in, partitionB.out, partitionC.out)
}
// binaryPartitionByType: [A, B <: A, C <: A]()(
// implicit evidence$1: scala.reflect.ClassTag[B], implicit evidence$2: scala.reflect.ClassTag[C]
// ) akka.stream.Graph[akka.stream.FanOutShape2[A,B,C],akka.NotUsed]
Note that ClassTag is needed to avoid type erasure.
Upvotes: 0
Reputation: 23788
The thing is that you are trying to subvert the type system. UniformFanOutShape
is named "uniform" because all of its outputs are of the same type. If it was not that, you wouldn't need to create an additional FanOutShape2
in the first place. If you are going to subert the type system, you should do it consistently so you should change the type of the Outlet
s. Try something like this:
new FanOutShape2(partitioner.in, partitioner.out(0).outlet.as[B], partitioner.out(1).outlet.as[C])
Upvotes: 1