Kris Rice
Kris Rice

Reputation: 1161

Why is my actor ask getting dead letters?

Background:

I have the following flow process:

This process looks like this:

PeerServer handling:

      else if (peerRequest.request.isPeerChat) {
        // forward the PeerChat to PeerManager
        val future: Future[PeerResponse] = (peerManager ? peerRequest.getPeerChat).mapTo[PeerResponse]
        
        try {
          val result = Await.result(future, timeout.duration)
          resultingRoute = complete(result)
        } catch {
          case e: TimeoutException =>
            resultingRoute = complete(PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse))
        }
      }

PeerManager case for PeerChat:

    case peerChat@PeerChat(_, serverID, _, _) =>
      val peerEntry = peerMap.get(serverID)

      if (peerEntry.isDefined) {
        try {
          val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)

          if (state != null && state.name == "WaitMessages") {
            val peerResponse = Await.result((peerEntry.get ? (self, peerChat)).mapTo[PeerChatResponse], timeout.duration)
            sender() ! PeerResponse().withPeerChatResponse(peerResponse)
          }
        } catch {
          case e: TimeoutException =>
            sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
        }
      } else {
        sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
      }

Peer case for handling (ActorRef, PeerChat):

    case (sender: ActorRef, peerChat@PeerChat(chatID, _, encryptedByteString, _)) =>
      system.log.info("processing PeerChat")
      try {
        // decrypt and store the chat in the database
        val decodedData = Base64.getDecoder.decode(encryptedByteString)
        val unencryptedBytes = peer.getEncryptionManager.decrypt(decodedData)
        // re-form the bytes back into a ChatRequest
        val chatRequest: ChatRequest = ChatRequest.parseFrom(unencryptedBytes)
        // verify we don't already have this chat in the database

        val database = DatabaseUtil.getInstance
        val chats = database.queryByFields(classOf[Chat], Map(
          "serverID" -> Seq(chatRequest.serverID),
          "timestamp" -> Seq(chatRequest.timestamp),
          "text" -> Seq(chatRequest.chatText)
        ))

        if (chats.isEmpty) {
          // store the chat
          val chat = new Chat(chatRequest)
          chat.sent = true
          chat.confirmed = true
          database.addWithGeneratedID(chat)
        }
        // tell the sender it was successful
        val encryptedResponse = peer.getEncryptionManager.encrypt(ByteBuffer.allocate(4).putInt(chatID).array())
        val encodedData = Base64.getEncoder.encodeToString(encryptedResponse)
        val peerChatResponse = PeerChatResponse(encodedData)

        system.log.info(s"Replying with PeerChatResponse:${peerChatResponse.toProtoString}")

        sender ! peerChatResponse
      } catch {
        case e: Exception =>
          system.log.error(e, "Failed processing PeerChat")
      }

I would like to point out, I am passing the PeerManager self because when the Peer receives this message, the receive behaviour is defined in a State class and therefore does not have direct access to the normal sender().

The Problem:

When the Peer receives the (ActorRef, PeerChat) from the PeerManager, I get the error:

Message [peer.chat.PeerChatResponse.PeerChatResponse] to Actor[akka://system-actor/user/peer_manager#1099549428] was unhandled.

I would also add that this process works for other states, with the exact same flow. When this happens with the other messages, the Await.result seems to handle the response from the Peer. However, for this specific process with PeerChat, it seems the response is being sent directly to PeerManager and not handled by the Await.result future. The process is identical except the other process passes down a SessionRequest message.

Is there any suggestions as to why or if I am doing this wrong?

Upvotes: 1

Views: 46

Answers (1)

Kris Rice
Kris Rice

Reputation: 1161

I have solved this issue by using the Actor forward functionality.

The updated PeerManager:

      val peerEntry = peerMap.get(serverID)
      val savedSender = sender()

      if (peerEntry.isDefined) {
        try {
          val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)

          if (state != null && state.name == "WaitMessages") {
            peerEntry.get.forward((savedSender, peerChat))
          }
        } catch {
          case e: TimeoutException =>
            savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
        }
      } else {
        savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
      }

Then in the Peer case for managing the PeerChat, I respond with a full ApiResponse, as the future in the server expects.

This ensures that the server Future receives the PeerResponse by saving the original sender and reserving it down the process chain.

Upvotes: 1

Related Questions