Reputation: 1161
Background:
I have the following flow process:
PeerRequest
proto (via http) to another instancePeerServer
receives and process the requestasks
(using ?
) the PeerManager
actor to process the requestPeerManager
asks
(?
) the Peer
Actor (via its ID, a field of PeerRequest
) to process the PeerRequest.PeerChat
messagePeer
does some stuff (decryption, adding the chat to the DB, encrypting a response) and responds with a PeerChatResponse
This process looks like this:
PeerServer
handling:
else if (peerRequest.request.isPeerChat) {
// forward the PeerChat to PeerManager
val future: Future[PeerResponse] = (peerManager ? peerRequest.getPeerChat).mapTo[PeerResponse]
try {
val result = Await.result(future, timeout.duration)
resultingRoute = complete(result)
} catch {
case e: TimeoutException =>
resultingRoute = complete(PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse))
}
}
PeerManager
case for PeerChat
:
case peerChat@PeerChat(_, serverID, _, _) =>
val peerEntry = peerMap.get(serverID)
if (peerEntry.isDefined) {
try {
val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)
if (state != null && state.name == "WaitMessages") {
val peerResponse = Await.result((peerEntry.get ? (self, peerChat)).mapTo[PeerChatResponse], timeout.duration)
sender() ! PeerResponse().withPeerChatResponse(peerResponse)
}
} catch {
case e: TimeoutException =>
sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
Peer
case for handling (ActorRef, PeerChat)
:
case (sender: ActorRef, peerChat@PeerChat(chatID, _, encryptedByteString, _)) =>
system.log.info("processing PeerChat")
try {
// decrypt and store the chat in the database
val decodedData = Base64.getDecoder.decode(encryptedByteString)
val unencryptedBytes = peer.getEncryptionManager.decrypt(decodedData)
// re-form the bytes back into a ChatRequest
val chatRequest: ChatRequest = ChatRequest.parseFrom(unencryptedBytes)
// verify we don't already have this chat in the database
val database = DatabaseUtil.getInstance
val chats = database.queryByFields(classOf[Chat], Map(
"serverID" -> Seq(chatRequest.serverID),
"timestamp" -> Seq(chatRequest.timestamp),
"text" -> Seq(chatRequest.chatText)
))
if (chats.isEmpty) {
// store the chat
val chat = new Chat(chatRequest)
chat.sent = true
chat.confirmed = true
database.addWithGeneratedID(chat)
}
// tell the sender it was successful
val encryptedResponse = peer.getEncryptionManager.encrypt(ByteBuffer.allocate(4).putInt(chatID).array())
val encodedData = Base64.getEncoder.encodeToString(encryptedResponse)
val peerChatResponse = PeerChatResponse(encodedData)
system.log.info(s"Replying with PeerChatResponse:${peerChatResponse.toProtoString}")
sender ! peerChatResponse
} catch {
case e: Exception =>
system.log.error(e, "Failed processing PeerChat")
}
I would like to point out, I am passing the PeerManager
self
because when the Peer
receives this message, the receive
behaviour is defined in a State class and therefore does not have direct access to the normal sender()
.
The Problem:
When the Peer
receives the (ActorRef, PeerChat)
from the PeerManager
, I get the error:
Message [peer.chat.PeerChatResponse.PeerChatResponse] to Actor[akka://system-actor/user/peer_manager#1099549428] was unhandled.
I would also add that this process works for other states, with the exact same flow. When this happens with the other messages, the Await.result
seems to handle the response from the Peer
. However, for this specific process with PeerChat
, it seems the response is being sent directly to PeerManager
and not handled by the Await.result
future. The process is identical except the other process passes down a SessionRequest
message.
Is there any suggestions as to why or if I am doing this wrong?
Upvotes: 1
Views: 46
Reputation: 1161
I have solved this issue by using the Actor forward
functionality.
The updated PeerManager
:
val peerEntry = peerMap.get(serverID)
val savedSender = sender()
if (peerEntry.isDefined) {
try {
val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)
if (state != null && state.name == "WaitMessages") {
peerEntry.get.forward((savedSender, peerChat))
}
} catch {
case e: TimeoutException =>
savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
Then in the Peer
case for managing the PeerChat
, I respond with a full ApiResponse
, as the future in the server expects.
This ensures that the server Future receives the PeerResponse
by saving the original sender and reserving it down the process chain.
Upvotes: 1