Reputation: 7374
I have a TCP connection in Akka Stream that ends in a Sink. Right now all messages go into one Sink. I want to split the stream into an unknown number of Sinks given some function.
The use case is as follows, from the TCP connection I get en continuous stream of something like List[DeltaValue]
, now I want to create an actorSink for each DeltaValue.id
so that i can continuously accumulate and implement behaviour for each DeltaValue.id
. I find this to be a standard use case in stream processing but I'm not able to find a good example with Akka Stream.
This is what I have right now:
def connect(): ActorRef = tcpConnection
.//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
.to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
.run()
Update: I now have this, not sure what to say about it, it does not feel super stable but it should work:
private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
case Success(actorRef) => actorRef ! m
case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
}
}
def connect(): ActorRef = tcpConnection
.to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
.run()
Upvotes: 1
Views: 1751
Reputation: 9023
The below should be a somewhat improved version of what was updated in the question. The main improvement is that your actors are kept in a data structure to avoid actorSelection
resolution for every incoming message.
case class DeltaValue(id: String, value: Double)
val src: Source[DeltaValue, NotUsed] = ???
src.runFold(Map[String, ActorRef]()){
case (actors, elem) if actors.contains(elem.id) ⇒
actors(elem.id) ! elem.value
actors
case (actors, elem) ⇒
val newActor = system.actorOf(ReceiverActor.props(), ReceiverActor.name)
newActor ! elem.value
actors.updated(elem.id, newActor)
}
Keep in mind that, when you integrate Akka Streams with bare actors, you lose backpressure support. This is one of the reasons why you should try and implement your logic within the boundaries of Akka Streams whenever possible. And this is not always possible - e.g. when remoting is needed etc.
In your case, you could consider leveraging groupBy
and the concept of substream. The example below is folding the elements of each substream by summing them, just to give an idea:
src.groupBy(maxSubstreams = Int.MaxValue, f = _.id)
.fold("" → 0d) {
case ((id, acc), delta) ⇒ id → delta.value + acc
}
.mergeSubstreams
.runForeach(println)
Upvotes: 1
Reputation: 17923
EventStream
You can send messages to the ActorSystem's EventStream
within a stream sink and separately have the Actors subscribe to the stream.
Split At Stream Level
You can split the stream at the stream level using Broadcast. The documentation has a good example of this.
Split At Actor Level
You could also use Sink.actorRef
in combination with a BroadcastPool
to broadcast the messages to multiple Actors.
Upvotes: 1