zj43
zj43

Reputation: 15

when an actor contains an async method, this will lead to dead letter error with the Ask time out exception

I use ask mode send a request to an actor called actor-A in a function called fun-F. The actor will get an ID generated by another system in an async way, when this is completed, I will forward a message contains this ID to another actor called actor-B, the actor-B will do some DB operations and then send back the DB operation result in a message to the sender, since in my case I use forward mode, so the actor-B recognize the sender as fun-F, the akka will give the fun-F a temporary actor name, so the returned value should be delivered to the temp actor.

My question is:

If I use sync-method to get the ID from another system, then forward this message to the actor-B, after actor-B's DB operation, the result can be delivered to the value of fun-F, and the fun-F is defined as temporary actor Actor[akka://ai-feedback-service/temp/$b] by the akka framework runtime.

If I use async-method to get the ID from another system, when it completed, I will forward the message in the oncompleted {} code block in another call back thread, the DB operation in actor-B is handled successfully, but the returned value cannot be delivered to the value defined in the fun-F, and in this case the fun-F is defined as Actor[akka://ai-feedback-service/deadLetters] by the akka framwork runtime. So the actor-B lose its way and do not know how to get back or where should this message be delivered, and this will cause an Ask time out exception throws in my log.

How can I handled this issue? or how can I avoid this dead letter ask time out exception?

Below is my code:

// this is the so-called fun-F [createFeedback]
def createFeedback(query: String, 
                   response: String, 
                   userId: Long, 
                   userAgent: String, 
                   requestId: Long, 
                   errType: Short, 
                   memo: String): Future[java.lang.Long] = {
    val ticket = Ticket(userId,
                        requestId,
                        query,
                        response,
                        errType,
                        userAgent,
                        memo)
    val issueId = (jiraActor ? CreateJiraTicketSignal(ticket))
                  .mapTo[CreateFeedbackResponseSignal].map{ r =>
        r.issueId.asInstanceOf[java.lang.Long]
    }
    issueId
}


//this is the so-called actor-A [jiraActor]
//receive method are run in its parent actor for some authorization
//in this actor only override the handleActorMsg method to deal msg
override def handleActorMsg(msg: ActorMsgSignal): Unit = {
    msg match {
        case s:CreateJiraTicketSignal =>
            val issueId = createIssue(cookieCache.cookieContext.flag,
                                     cookieCache.cookieContext.cookie,
                                     s.ticket)
            println(s">> ${sender()} before map $issueId")
            issueId.map{
                case(id:Long) =>
                    println(s">> again++issueId = $id ${id.getClass}")
                    println(s">>> $self / ${sender()}")
                    println("again ++ jira action finished")
                    dbActor.forward(CreateFeedbackSignal(id,s.ticket))
                case(message:String) if(!s.retry) =>
                    self ! CreateJiraTicketSignal(s.ticket,true)
                case(message:String) if(s.retry) =>
                    log.error("cannot create ticket :" + message)
            }
            println(s">> after map $issueId")
}


//this is the so-called actor-B [dbActor]
override def receive: Receive = {
    case CreateFeedbackSignal(issueId:Long, ticket:Ticket) =>
        val timestampTicks = System.currentTimeMillis()
        val description: String = Json.obj("question" -> ticket.query, 
                                          "answer" -> ticket.response)
                                          .toString()
        dao.createFeedback(issueId,
                           ticket.usrId.toString,
                           description,
                           FeedbackStatus.Open.getValue
                                .asInstanceOf[Byte],
                           new Timestamp(timestampTicks),
                           new Timestamp(timestampTicks),
                           ticket.usrAgent,
                           ticket.errType,
                           ticket.memo)

        println(s">> sender = ${sender()}")
        sender() ! (CreateFeedbackResponseSignal(issueId))
        println("db issue id is " + issueId)
        println("db action finished")
}

Upvotes: 0

Views: 429

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

To avoid the dead letters issue, do the following:

  1. For every request, use an identifier (probably the requestId) that you can associate with the ultimate target for the request. That is, tie the requestId that you're passing to the createFeedback method to the caller (ActorRef) of that method, then pass this id through your messaging chain. You can use a map to hold these associations.

    • Change CreateFeedbackResponseSignal(issueId) to include the requestId from the Ticket class: CreateFeedbackResponseSignal(requestId, issueId).
  2. When dealing with the asynchronous result of a Future from inside an actor, pipe the result of the Future to self instead of using a callback.

    • With this approach, the result of createIssue will be sent to jiraActor when the result is available. jiraActor then sends that result to dbActor.
    • jiraActor will be the sender in dbActor. When jiraActor receives the result from dbActor, jiraActor can look up the reference to the target in its internal map.

Below is a simple example that mimics your use case and is runnable in ScalaFiddle:

import akka.actor._
import akka.pattern.{ask, pipe}
import akka.util.Timeout

import language.postfixOps

import scala.concurrent._
import scala.concurrent.duration._

case class Signal(requestId: Long)
case class ResponseSignal(requestId: Long, issueId: Long)

object ActorA {
  def props(actorB: ActorRef) = Props(new ActorA(actorB))
}

class ActorA(dbActor: ActorRef) extends Actor {
  import context.dispatcher

  var targets: Map[Long, ActorRef] = Map.empty

  def receive = {
    case Signal(requestId) =>
      val s = sender
      targets = targets + (requestId -> s)
      createIssue(requestId).mapTo[Tuple2[Long, Long]].pipeTo(self) // <-- use pipeTo
    case ids: Tuple2[Long, Long] =>
      println(s"Sending $ids to dbActor")
      dbActor ! ids
    case r: ResponseSignal =>
      println(s"Received from dbActor: $r")
      val target = targets.get(r.requestId)
      println(s"In actorA, sending to: $target")
      target.foreach(_ ! r)
      targets = targets - r.requestId
  }
}

class DbActor extends Actor {
  def receive = {
    case (requestId: Long, issueId: Long) =>
      val response = ResponseSignal(requestId, issueId)
      println(s"In dbActor, sending $response to $sender")
      sender ! response
  }
}

val system = ActorSystem("jiratest")
implicit val ec = system.dispatcher

val dbActor = system.actorOf(Props[DbActor])
val jiraActor = system.actorOf(Props(new ActorA(dbActor)))

val requestId = 2L

def createIssue(requestId: Long): Future[(Long, Long)] = {
  println(s"Creating an issue ID for requestId[$requestId]")
  Future((requestId, 99L))
}

def createFeedback(): Future[Long] = {
  implicit val timeout = Timeout(5.seconds)
  val res = (jiraActor ? Signal(requestId)).mapTo[ResponseSignal]
  res.map(_.issueId)
}

createFeedback().onComplete { x =>
  println(s"Done: $x")
}

Running the above code in ScalaFiddle results in the following output:

Creating an issue ID for requestId[2]
Sending (2,99) to dbActor
In dbActor, sending ResponseSignal(2,99) to Actor[akka://jiratest/user/$b#-710097339]
Received from dbActor: ResponseSignal(2,99)
In actorA, sending to: Some(Actor[akka://jiratest/temp/$a])
Done: Success(99)

Upvotes: 1

Related Questions