Reputation: 7005
I'm building an actor-based service in Scala where consumers can query whether clients are authorized and can also authorize clients.
If a consumer queries the authorization state of a client and that client isn't authorized yet, the actor should wait for incoming Authorize
messages within a specified timeout and then send a reply. IsAuthorized
should be able to be executed synchronously in the consumers code so that it blocks and waits for a reply. Something like
service !? IsAuthorized(client) => {
case IsAuthorizedResponse(_, authorized) => // do something
}
However receiveWithin()
in my actor never receives a messages and always runs into the timeout.
Here's my code
case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)
class ClientAuthorizationService {
private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
def actor = Actor.actor {
loop {
react {
case IsAuthorized(client: Client) => reply {
if (authorized contains client) {
IsAuthorizedResponse(client, true)
} else {
waiting += client
var matched = false;
val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)
while (!matched && Instant.now.isBefore(end)) {
// ERROR HERE: Never receives Authorize messages
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(authorizedClient: Client) => {
authorizeClient(authorizedClient)
if (authorizedClient == client) matched = true
}
case TIMEOUT => // do nothing since we handle the timeout in the while loop
}
}
IsAuthorizedResponse(client, matched)
}
}
case Authorize(client: Client) => authorizeClient(client)
case WaitingForAuthorization => reply {
WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
}
}
}
}
private def authorizeClient(client: Client) = synchronized {
authorized += client
waiting -= client
}
}
object ClientAuthorizationService {
val AUTH_TIMEOUT: Long = 60 * 1000;
}
When I'm sending an Authorize
message to the actor while it's in the receiveWithin block the messages is caught by the second case statement below that should actually only catch these messages when no one is waiting for a response at that time.
What's wrong with my code?
Update:
Here's a shortened version of the relevant code which actually represent a much simpler and different logic but maybe better clarifies the problem:
loop {
react {
case IsAuthorized(client: Client) => reply {
var matched = false
// In the "real" logic we would actually loop here until either the
// authorized client matches the requested client or the timeout is hit.
// For the sake of the demo we only take the first Authorize message.
receiveWithin(60*1000) {
// Although Authorize is send to actor it's never caught here
case Authorize(authorizedClient: Client) => matched = authorizedClient == client
case TIMEOUT =>
}
IsAuthorizedResponse(client, matched)
}
case Authorize(client: Client) => // this case is hit
}
}
Update 2:
I finally solved the problem. I think the problem was that the actor was blocking when trying to receive an Authorize
message within the reply to the preceding IsAuthorized
message.
I rewrote the code so that an anonymous Actor is started when we're waiting for an Authorized
. Here's the code for those who are interested. waiting
is a Map[Client, Actor]
.
loop {
react {
case IsAuthorized(client: Client) =>
if (authorized contains client) {
sender ! IsAuthorizedResponse(client, true)
} else {
val receipient = sender
// Start an anonymous actor that waits for an Authorize message
// within a given timeout and sends a reply to the consumer.
// The actor will be notified by the parent actor below.
waiting += client -> Actor.actor {
val cleanup = () => {
waiting -= client
exit()
}
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(c) =>
receipient ! IsAuthorizedResponse(client, true)
cleanup()
case TIMEOUT =>
receipient ! IsAuthorizedResponse(client, false)
cleanup()
}
}
}
case Authorize(client: Client) =>
authorized += client
waiting.get(client) match {
case Some(actor) => actor ! Authorize(client)
case None =>
}
case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
}
}
If there are better ways to solve this problem please let me know!
Upvotes: 3
Views: 473
Reputation: 7005
I finally solved the problem. I think the problem was that the actor was blocking when trying to receive an Authorize
message within the reply to the preceding IsAuthorized
message.
I rewrote the code so that an anonymous Actor is started when we're waiting for an Authorized
. Here's the code for those who are interested. waiting
is a Map[Client, Actor]
.
loop {
react {
case IsAuthorized(client: Client) =>
if (authorized contains client) {
sender ! IsAuthorizedResponse(client, true)
} else {
val receipient = sender
// Start an anonymous actor that waits for an Authorize message
// within a given timeout and sends a reply to the consumer.
// The actor will be notified by the parent actor below.
waiting += client -> Actor.actor {
val cleanup = () => {
waiting -= client
exit()
}
receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
case Authorize(c) =>
receipient ! IsAuthorizedResponse(client, true)
cleanup()
case TIMEOUT =>
receipient ! IsAuthorizedResponse(client, false)
cleanup()
}
}
}
case Authorize(client: Client) =>
authorized += client
waiting.get(client) match {
case Some(actor) => actor ! Authorize(client)
case None =>
}
case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
}
}
If there are better ways to solve this problem please let me know!
Upvotes: 0
Reputation: 29538
Isn't reply the problem ? In
case IsAuthorized(client: Client) => reply { ... }
all the code is in argument to reply block, so it is executed (included the receiveWithing) before the reply is actually sent. Which means when you client will process your reply, you will no longer be waiting for its.
In your original code, it should probably be something like
case IsAuthorized(client: Client) =>
if(ok) reply(AuthorizedReply(client, true))
else {
reply(AuthorizedReply(client, false))
receiveWithin(...)
}
Upvotes: 0