Reputation: 11
My question is somewhere related to : Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef with some differences :
I don't find a way to get an actorRef to send a message to the Actor Publisher instance held by the Source.
def run(implicit system: ActorSystem) = { import system.dispatcher implicit val materializer = ActorMaterializer() val source = Source.actorPublisher[TestRequest](TestActor.props).map { request => request.event } //Implementation in subpackage val sinkLevel1 = Sinks.sinkLevel1 val sinkLevel2 = Sinks.sinkLevel2 //Implementation in subpackage val stageTriage = FlowStages.stageTriage val stageEvalProcess1 = FlowStages.stageEvalProcess1 val stageEvalProcess2 = FlowStages.stageEvalProcess2 val pipeline = FlowGraph.closed(){ implicit builder => import FlowGraph.Implicits._ val stageDispatchByRuleLevels = builder.add(Broadcast[TriagedSystemEvent](2)) source ~> stageTriage ~> stageDispatchByRuleLevels stageDispatchByRuleLevels ~> stageEvalProcess1 ~> sinkLevel1 stageDispatchByRuleLevels ~> stageEvalProcess2 ~> sinkLevel2 } pipeline.run() }
Thanks for help !
Oliver
Upvotes: 1
Views: 229
Reputation: 21
Based on Noah's answer in the linked question, if you add
val ref = pipeline.run()
you can then send messages to ref, like
ref ! ...
Upvotes: 1