fommil
fommil

Reputation: 5885

Integrating an ack-based Actor with akka-stream

I have an Actor which has been designed to work with akka-io acking, such that it will wait for an Ack when sending messages upstream (to the network). This actor is the interface to an async application in the backend.

I'd like to have a wrapper layer which allows me to convert this Actor into an akka-streams Flow[Incoming, Outgoing, ???] so that it can be integrated with newer libraries that expect such a signature.

(Incoming messages from upstream are rare, so we don't care too much about backpressuring there but it wouldn't be a bad thing to have it.)

sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack

// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
  def receive = {
    case in: Incoming if sender() == upstream =>
       // does some work in response to upstream
    case other =>
       // does some work in response to downstream
       // including sending messages to upstream and
       // `becoming` a stashing state waiting for Ack
       // to `unbecome`, then sending Ack downstream
       // (which will respect the backpressure).
  }
}

I have it on good authority from the akka-user mailing list that there is no code in akka-streams that integrates actors with streams, and in order to plug an Actor into a Stream and preserve Ack-based backpressure, one would have to implement PushPullStage.

It seems that we'd actually need two PushPullStages here... one for upstream => SimpleActor and one for SimpleActor => upstream.

My questions are:

  1. Are there any libraries that offer integrations such as this between actors and streams?
  2. Is there a simpler way of doing it than implementing bi-directional PushPullStage from scratch?
  3. Is there any existing test framework that would allow such an implementation to be stress tested?

Upvotes: 0

Views: 1357

Answers (2)

mandubian
mandubian

Reputation: 4427

I think the philosophy of akka-stream is to provide low-level bricks and build higher-level tools on top of them. If you look at our recently released opensource library https://github.com/MfgLabs/akka-stream-extensions, you'll see that we have done that exactly. We provide some useful structures to make it easier to manage rate limiters, stateful processors, laziness & generators etc... For actor integration, I think it should be possible to create some kind of helpers to make it easier to integrate actors with akka-stream trying to propagate backpressure. Akka-Stream is still young and the ecosystem keeps growing ;)

Upvotes: 5

Quizzie
Quizzie

Reputation: 879

Yes, you can integrate actors with streams.
There are special actors for this purpose: the actor publisher and actor subscriber.

It's all in here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

Of course you have to write the actor in such a way, that it works with the streams backpressure. But you do not need a push pull stage.

Upvotes: 2

Related Questions