Toaditoad
Toaditoad

Reputation: 304

Ways to maintain back pressure in Akka Streams involving multiple JVMs

I'm aware that as of Akka 2.4.16, there is no "remote" implementation of Reactive Streams. The specification focuses on a stream running on a single JVM.

However, considering the use case to involve another JVM for some processing while maintaining back pressure. The idea is to have a main application that provides a user interface running a stream. For instance, this stream has a stage performing some heavy computations that should run on a different machine. I'm interested in ways to run streams in a distributed way - I came across some articles pointing out some ideas:

What other alternatives are there? Are there any significant downsides to the above? Any special characteristics to consider?

Update: This question is not limited to a single use case. I'm generally interested in all possible ways to work with streams in a distributed environment. That means, e.g. it can involve only one stream that integrates actors with .mapAsync or e.g. there could be two separate streams on two machines communicating via Akka HTTP. The only requirement is that back pressure has to be enforced among all components.

Upvotes: 4

Views: 1235

Answers (2)

Robin Green
Robin Green

Reputation: 33063

In Akka 2.5.10 and above, you can now use StreamRefs for this. StreamRefs are designed for this use-case, so they are particularly suitable for remote work queues, because they back pressure until the stream that is locally attached to them can accept more work.

Upvotes: 0

sarveshseri
sarveshseri

Reputation: 13985

Well... It seems that I will have to add an example for that. One thing that you need to understand is that BackPressure is handled by the AsyncBoundries in GraphStages. It really has nothing to do with a component existing some where else. Also... It is not dependent on Artery which is nothing but the new remote transport.

Here is an example of probably the simplest cross-jvm stream,

First Application,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

class MyActor extends Actor with ActorLogging {
  override def receive: Receive = {
    case msg @ _ => {
      log.info(msg.toString)
      sender() ! msg
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

And Second application... actually "runs" the stream which uses the actor in first application.

import akka.actor.{ActorPath, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.pattern.ask
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps
import scala.concurrent.duration._

object YourApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="127.0.0.1"
      |      port=19000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("your-actor-system", config)

  import actorSystem.dispatcher

  val logger = actorSystem.log

  implicit val implicitActorSystem = actorSystem
  implicit val actorMaterializer = ActorMaterializer()

  val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor")

  val myActorSelection = actorSystem.actorSelection(myActorPath)

  val source = Source(1 to 10)

  // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage
  val myRemoteComponent = Flow[Int].mapAsync(2)(i => {
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
      (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int])
    )
  })

  val sink = Sink.foreach[Int](i => logger.info(i.toString))

  val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right)

  val streamRun = stream.run()

}

Upvotes: 1

Related Questions