Soumya Simanta
Soumya Simanta

Reputation: 11741

Multiple Future calls in an Actor's receive method

I'm trying to make two external calls (to a Redis database) inside an Actor's receive method. Both calls return a Future and I need the result of the first Future inside the second. I'm wrapping both calls inside a Redis transaction to avoid anyone else from modifying the value in the database while I'm reading it.

The internal state of the actor is updated based on the value of the second Future.

Here is what my current code looks like which I is incorrect because I'm updating the internal state of the actor inside a Future.onComplete callback.

I cannot use the PipeTo pattern because I need both both Future have to be in a transaction. If I use Await for the first Future then my receive method will block. Any idea how to fix this ?

My second question is related to how I'm using Futures. Is this usage of Futures below correct? Is there a better way of dealing with multiple Futures in general? Imagine if there were 3 or 4 Future each depending on the previous one.

import akka.actor.{Props, ActorLogging, Actor}
import akka.util.ByteString
import redis.RedisClient

import scala.concurrent.Future
import scala.util.{Failure, Success}


object GetSubscriptionsDemo extends App {
  val akkaSystem = akka.actor.ActorSystem("redis-demo")
  val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", "dummyzset")), name = "simpleactor")
  actor ! UpdateState
}

case object UpdateState

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  //required by Future
  implicit val ctx = context dispatcher

  var rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => {
      log.info("Start of UpdateState ...")

      val tran = rClient.transaction()

      val zf: Future[Long] = tran.zcard(key)  //FIRST Future 
      zf.onComplete {

        case Success(z) => {
          //SECOND Future, depends on result of FIRST Future 
          val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 
          rf.onComplete {
            case Success(x) => {
              //convert ByteString to UTF8 String
              val v = x.map(_.utf8String)
              log.info(s"Updating state with $v ")
              //update actor's internal state inside callback for a Future
              //IS THIS CORRECT ?
              mutableState ++ v
            }
            case Failure(e) => {
              log.warning("ZRANGE future failed ...", e)
            }
          }
        }
        case Failure(f) => log.warning("ZCARD future failed ...", f)
      }
      tran.exec()

    }
  }

}

The compiles but when I run it gets struck.

2014-08-07  INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2014-08-07 04:38:35.106UTC INFO [redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - Start of UpdateState ...
2014-08-07 04:38:35.134UTC INFO [redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to localhost/127.0.0.1:6379
2014-08-07 04:38:35.172UTC INFO [redis-demo-akka.actor.default-dispatcher-4] r.a.RedisClientActor - Connected to localhost/127.0.0.1:6379

UPDATE 1

In order to use pipeTo pattern I'll need access to the tran and the FIRST Future (zf) in the actor where I'm piping the Future to because the SECOND Future depends on the value (z) of FIRST.

    //SECOND Future, depends on result of FIRST Future 
      val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 

Upvotes: 3

Views: 1321

Answers (2)

cmbaxter
cmbaxter

Reputation: 35453

Without knowing too much about the redis client you are using, I can offer an alternate solution that should be cleaner and won't have issues with closing over mutable state. The idea is to use a master/worker kind of situation, where the master (the SimpleRedisActor) receives the request to do the work and then delegates off to a worker that performs the work and responds with the state to update. That solution would look something like this:

object SimpleRedisActor{
  case object UpdateState
  def props(ip:String, key:String) = Props(classOf[SimpleRedisActor], ip, key)
}

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
  import SimpleRedisActor._
  import SimpleRedisWorker._

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  val rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => 
      log.info("Start of UpdateState ...")      
      val worker = context.actorOf(SimpleRedisWorker.props)
      worker ! DoWork(rClient, key)

    case WorkResult(result) =>
      mutableState ++ result

    case FailedWorkResult(ex) =>
      log.error("Worker got failed work result", ex)
  }
}

object SimpleRedisWorker{
  case class DoWork(client:RedisClient, key:String)
  case class WorkResult(result:Seq[String])
  case class FailedWorkResult(ex:Throwable)
  def props = Props[SimpleRedisWorker]
}

class SimpleRedisWorker extends Actor{
  import SimpleRedisWorker._
  import akka.pattern.pipe
  import context._

  def receive = {
    case DoWork(client, key) =>
      val trans = client.transaction()
      trans.zcard(key) pipeTo self
      become(waitingForZCard(sender, trans, key) orElse failureHandler(sender, trans))
  }

  def waitingForZCard(orig:ActorRef, trans:RedisTransaction, key:String):Receive = {      
    case l:Long =>
      trans.zrange(key, l -1, l) pipeTo self
      become(waitingForZRange(orig, trans) orElse failureHandler(orig, trans))
  }

  def waitingForZRange(orig:ActorRef, trans:RedisTransaction):Receive = {
    case s:Seq[ByteString] =>
      orig ! WorkResult(s.map(_.utf8String))
      finishAndStop(trans)
  }

  def failureHandler(orig:ActorRef, trans:RedisTransaction):Receive = {
    case Status.Failure(ex) => 
      orig ! FailedWorkResult(ex)
      finishAndStop(trans)   
  }

  def finishAndStop(trans:RedisTransaction) {
    trans.exec()
    context stop self
  }
}

The worker starts the transaction and then makes calls into redis and ultimately completes the transaction before stopping itself. When it calls redis, it gets the future and pipes back to itself for the continuation of the processing, changing the receive method between as a mechanism of showing progressing through its states. In a model like this (which I suppose is somewhat similar to the error kernal pattern), the master owns and protects the state, delegating the "risky" work off to a child who can figure out what the change for the state should be, but the changing is still owned by the master.

Now again, I have no idea about the capabilities of the redis client you are using and if it is safe enough to even do this kind of stuff, but that's not really the point. The point was to show a safer structure for doing something like this that involves futures and state that needs to be changed safely.

Upvotes: 1

pushy
pushy

Reputation: 9635

Using the callback to mutate internal state is not a good idea, excerpt from the akka docs:

When using future callbacks, such as onComplete, onSuccess, and onFailure, inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback.

Why do you worry about pipeTo and transactions? Not sure how redis transactions work, but I would guess that the transaction does not encompass the onComplete callback on the second future anyways.

I would put the state into a separate actor which you pipe the future too. This way you have a separate mailbox, and the ordering there will be the same as the ordering of the messages that came in to modify the state. Also if any read requests come in, they will also be put in the correct order.

Edit to respond to edited question: Ok, so you don't want to pipe the first future, that makes sense, and should be no problem as the first callback is harmless. The callback of the second future is the problem, as it manipulates the state. But this future can be pipe without the need for access to the transaction.

So basically my suggestion is:

val firstFuture = tran.zcard
firstFuture.onComplete {
   val secondFuture = tran.zrange
   secondFuture pipeTo stateActor
}

With stateActor containing the mutable state.

Upvotes: 0

Related Questions