Oliver
Oliver

Reputation: 11

Get actorRef in akka-stream 1.0 when using Source.actorPublisher and a FlowGraph

My question is somewhere related to : Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef with some differences :

  1. I'm using akka-stream experimental 1.0
  2. I'm using the actorPublisher model
  3. I'm using a FlowGraph dsl for stream definition with parallel processing

I don't find a way to get an actorRef to send a message to the Actor Publisher instance held by the Source.

 def run(implicit system: ActorSystem) = {
   import system.dispatcher
   implicit val materializer = ActorMaterializer()

   val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event }

   //Implementation in subpackage
   val sinkLevel1 = Sinks.sinkLevel1 
   val sinkLevel2 = Sinks.sinkLevel2

   //Implementation in subpackage
   val stageTriage = FlowStages.stageTriage    
   val stageEvalProcess1 = FlowStages.stageEvalProcess1
   val stageEvalProcess2 = FlowStages.stageEvalProcess2

   val pipeline = FlowGraph.closed(){ implicit builder => 
     import FlowGraph.Implicits._

     val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2))

     source ~> stageTriage ~> stageDispatchByRuleLevels
                              stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1
                              stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2

   }

   pipeline.run()

 }

Thanks for help !

Oliver

Upvotes: 1

Views: 229

Answers (1)

yardena
yardena

Reputation: 21

Based on Noah's answer in the linked question, if you add

val ref = pipeline.run()

you can then send messages to ref, like

ref ! ...

Upvotes: 1

Related Questions