Stephen Woods
Stephen Woods

Reputation: 151

Dynamically creating Akka Stream Flows at Runtime

I'm currently trying to dynamically create Akka Stream graph definitions at runtime. The idea being that users will be able to define flows interactively and attach them to existing/running BroadcastHubs. This means I don't know which flows or even how many flows will be used at compile time.

Unfortunately, I'm struggling with generics/type erasure. Frankly, I'm not even sure what I'm attempting to do is possible on the JVM.

I have a function that will return an Akka Streams Flow representing two connected Flows. It uses Scala's TypeTags to get around type erasure. If the output type of the first flow is the same as the input type of the second flow, it can be successfully connected. This works just fine.

import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Flow, GraphDSL}

import scala.reflect.runtime.universe._
import scala.util.{Failure, Success, Try}

def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
                                                            b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = {
  Try {
    if (typeOf[B] =:= typeOf[C]) {
      val c = b.asInstanceOf[Flow[B, D, NotUsed]]

      Flow.fromGraph {
        GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) { implicit b =>
          (s1, s2) =>
            s1 ~> s2
            FlowShape(s1.in, s2.out)
        }
      }
    }
    else
      throw new RuntimeException(s"Connection failed. Incompatible types: ${typeOf[B]} and ${typeOf[C]}")
  }
}

So If I have Flow[A,B] and Flow[C,D], the result would be Flow[A,D] assuming that B and C are the same type.

I also have function that attempts to merge/reduce a List of Flows down to a single Flow. Lets assume that this list is derived from a list of flow definitions from a file or web request.

def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = {
  fcs match {
    case Nil => Success(None)
    case h :: Nil => Success(Some(h))
    case h :: t =>
      val n = t.head

      connect(h, n) match {
        case Success(fc) => merge(fc :: t)
        case Failure(e) => Failure(e)
      }
  }
}

Unfortunately, since the Flows are stored inside a List, due to type erasure on standard Lists, I lose all of the type information and therefore am unable to connect the Flows at runtime. Here's an example:

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)

def flowI2S() = Flow.fromFunction[Int, String](_.toString)

val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()

val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)

val y = merge(fcs)

This results in the exception:

Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)

I've been looking into Miles Sabin'sShapeless, and thought I might be able to use HLists to retain type information. Unfortunately, that seems to work only if I know the individual types and length of the list at compile time. If I upcast a specific HList to just HList, it looks like I lose the type information again.

val fcs: HList = a :: b :: c :: d :: HNil

So my question is... is this even possible? Is there a way to do this with Shapeless generics magic (preferably without the need to use specific non-existential type extractors)? I'd like to find as generic a solution as possible, and any help would be appreciated.

Thanks!

Upvotes: 14

Views: 1107

Answers (2)

Johny T Koshy
Johny T Koshy

Reputation: 3887

I know this is an old post. As I had some time I gave it a try. Not sure this is exactly the solution, but I thought would post and get suggestions.

  type FlowN[A, B] = Flow[A, B, NotUsed]

  trait FlowMerger[L <: HList] {
    type A
    type D
    def merge(flow: L): Option[FlowN[A, D]]
  }

  object FlowMerger extends LowPriorityImplicits {
    def apply[L <: HList](v: L)(implicit ev: FlowMerger[L]): Option[FlowN[ev.A, ev.D]] = ev.merge(v)

    type Aux[L <: HList, A1, D1] = FlowMerger[L] {
      type A = A1
      type D = D1
    }

    implicit def h1Instance[A1, D1]: FlowMerger.Aux[FlowN[A1, D1] :: HNil, A1, D1] = new FlowMerger[FlowN[A1, D1] :: HNil] {
      override type A = A1
      override type D = D1

      override def merge(flow: FlowN[A1, D1] :: HNil): Option[FlowN[A, D]] = Option(flow.head)
    }

  }

  trait LowPriorityImplicits {
    implicit def hMulInstance[A1, B1, D1, E1, F1, L <: HList, T <: HList, T1 <: HList]
     (implicit
     isHC1: IsHCons.Aux[L, FlowN[A1, B1], T],
     isHC2: IsHCons.Aux[T, FlowN[E1, F1], T1],
     lx: Lazy[FlowMerger[T]],
     typeableB: Lazy[Typeable[B1]],
     typeableE: Lazy[Typeable[E1]]
    ): FlowMerger.Aux[L, A1, D1] = {
      new FlowMerger[L] {
        override type A = A1
        override type D = D1

        override def merge(flow: L): Option[FlowN[A, D]] = {
          if (typeableB.value == typeableE.value) {
            lx.value.merge(isHC1.tail(flow)).map(t => isHC1.head(flow) via t.asInstanceOf[FlowN[B1, D]])
          } else None
        }
      }
    }
  }

You can use it as:

  FlowMerger(fcs).map(flow => Source(List(1, 2, 3)) via flow runForeach println)

Upvotes: 0

Tomer Shetah
Tomer Shetah

Reputation: 8529

As you already noticed, the reason it didn't work was that the list erases the types you had. Therefore it is impossible. If you know all of the types that can be used as intermediate types, you can solve that by adding a resolving function. Adding such a function will also simplify your connect method. I'll add a code snippet. I hope it will be clear.

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)

def main(args: Array[String]): Unit = {
    val idInt1 = flowIdentity[Int]()
    val idInt2 = flowIdentity[Int]()
    val int2String = flowI2S()
    val idString = flowIdentity[String]()
    val fcs = List(idInt1, idInt2, int2String, idString)

    val source = Source(1 to 10)
    val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
    source.via(mergedGraph).to(Sink.foreach(println)).run()
}

def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] = {
    fcs match {
      case Nil => None
      case h :: Nil => Some(h)
      case h :: t =>
        val n = t.head

        val fc = resolveConnect(h, n)
        merge(fc :: t.tail)
    }
}

def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = {
    if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]]) {
      connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
    } else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]]) {
      connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
    } else {
      throw new UnsupportedOperationException
    }
}

def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] = {
    a.via(b)
}

def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] = {
   a.via(b)
}

p.s

There is another bug hiding there, of an endless loop. When calling the merge recursion, the first element should be dropped, as it was already merged into the main flow.

Upvotes: -1

Related Questions