phantomastray
phantomastray

Reputation: 449

Using supervision strategy with GraphStage doesn't work

I am trying to recover using Supervision stategy, it works when I write Flow stages using map, but if I am using graph stage, it is never caught, and the entire pipeline fails

    object  test extends App{

      val stageSupervisionDecider: Supervision.Decider = {
        case cEx: IllegalArgumentException =>
          println("Supervision Catch")
          Supervision.Resume
        case _ => Supervision.Stop
      }

      implicit val system = ActorSystem("system")

      implicit val materializer = ActorMaterializer(
        ActorMaterializerSettings(system)
          .withSupervisionStrategy(stageSupervisionDecider)
      )

      Source(Vector(1,2,3,4,5,6,7))
        .via(new FailFlow)
        .runWith(Sink.foreach(println))
    }


    class FailFlow extends GraphStage[FlowShape[Int, Int]] {

      val in = Inlet[Int]("FailFlow.In")
      val out = Outlet[Int]("FailFlow.Out")

      override def shape: FlowShape[Int, Int] = FlowShape.of(in, out)

      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
        new GraphStageLogic(shape) {
          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              val m = grab(in)
              if(m % 2 == 0)
                throw new IllegalArgumentException("illegal value")
              else
              push(out,m)
            }
          })

          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
                pull(in)
            }
          })
        }
      }
    }

Any ideas what is happening wrong here?

Upvotes: 2

Views: 347

Answers (1)

lpiepiora
lpiepiora

Reputation: 13749

According to the documentation (big red box):

ZipWith, GraphStage junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet

Upvotes: 2

Related Questions