Sven Jacobs
Sven Jacobs

Reputation: 7005

Scala actor: receiveWithin() doesn't receive messages

I'm building an actor-based service in Scala where consumers can query whether clients are authorized and can also authorize clients.

If a consumer queries the authorization state of a client and that client isn't authorized yet, the actor should wait for incoming Authorize messages within a specified timeout and then send a reply. IsAuthorized should be able to be executed synchronously in the consumers code so that it blocks and waits for a reply. Something like

service !? IsAuthorized(client) => {
  case IsAuthorizedResponse(_, authorized) => // do something
}

However receiveWithin() in my actor never receives a messages and always runs into the timeout.

Here's my code

case object WaitingForAuthorization
case class WaitingForAuthorizationResponse(clients: immutable.Set[Client])
case class IsAuthorized(client: Client)
case class IsAuthorizedResponse(client: Client, authorized: Boolean)
case class Authorize(client: Client)

class ClientAuthorizationService {
  private val authorized: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]
  private val waiting: mutable.Set[Client] = new mutable.HashSet[Client] with mutable.SynchronizedSet[Client]

  def actor = Actor.actor {
    loop {
      react {
        case IsAuthorized(client: Client) => reply {
          if (authorized contains client) {
            IsAuthorizedResponse(client, true)
          } else {
            waiting += client
            var matched = false;
            val end = Instant.now.plus(ClientAuthorizationService.AUTH_TIMEOUT)

            while (!matched && Instant.now.isBefore(end)) {
              // ERROR HERE: Never receives Authorize messages
              receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
                case Authorize(authorizedClient: Client) => {
                  authorizeClient(authorizedClient)
                  if (authorizedClient == client) matched = true
                }
                case TIMEOUT => // do nothing since we handle the timeout in the while loop
              }
            }

            IsAuthorizedResponse(client, matched)
          }
        }

        case Authorize(client: Client) => authorizeClient(client)
        case WaitingForAuthorization => reply {
          WaitingForAuthorizationResponse(immutable.Set() ++ waiting)
        }
      }
    }
  }

  private def authorizeClient(client: Client) = synchronized {
    authorized += client
    waiting -= client
  }
}

object ClientAuthorizationService {
  val AUTH_TIMEOUT: Long = 60 * 1000;
}

When I'm sending an Authorize message to the actor while it's in the receiveWithin block the messages is caught by the second case statement below that should actually only catch these messages when no one is waiting for a response at that time.

What's wrong with my code?

Update:

Here's a shortened version of the relevant code which actually represent a much simpler and different logic but maybe better clarifies the problem:

loop {
  react {
    case IsAuthorized(client: Client) => reply {
      var matched = false

      // In the "real" logic we would actually loop here until either the
      // authorized client matches the requested client or the timeout is hit.
      // For the sake of the demo we only take the first Authorize message.

      receiveWithin(60*1000) {
        // Although Authorize is send to actor it's never caught here
        case Authorize(authorizedClient: Client) => matched = authorizedClient == client
        case TIMEOUT => 
      }

      IsAuthorizedResponse(client, matched)
    }

    case Authorize(client: Client) => // this case is hit
  }
}

Update 2:

I finally solved the problem. I think the problem was that the actor was blocking when trying to receive an Authorize message within the reply to the preceding IsAuthorized message.

I rewrote the code so that an anonymous Actor is started when we're waiting for an Authorized. Here's the code for those who are interested. waiting is a Map[Client, Actor].

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

If there are better ways to solve this problem please let me know!

Upvotes: 3

Views: 473

Answers (2)

Sven Jacobs
Sven Jacobs

Reputation: 7005

I finally solved the problem. I think the problem was that the actor was blocking when trying to receive an Authorize message within the reply to the preceding IsAuthorized message.

I rewrote the code so that an anonymous Actor is started when we're waiting for an Authorized. Here's the code for those who are interested. waiting is a Map[Client, Actor].

loop {
  react {
    case IsAuthorized(client: Client) =>
      if (authorized contains client) {
        sender ! IsAuthorizedResponse(client, true)
      } else {
        val receipient = sender
        // Start an anonymous actor that waits for an Authorize message
        // within a given timeout and sends a reply to the consumer.
        // The actor will be notified by the parent actor below.
        waiting += client -> Actor.actor {
          val cleanup = () => {
            waiting -= client
            exit()
          }

          receiveWithin(ClientAuthorizationService.AUTH_TIMEOUT) {
            case Authorize(c) =>
              receipient ! IsAuthorizedResponse(client, true)
              cleanup()
            case TIMEOUT =>
              receipient ! IsAuthorizedResponse(client, false)
              cleanup()
          }
        }
      }

    case Authorize(client: Client) =>
      authorized += client

      waiting.get(client) match {
        case Some(actor) => actor ! Authorize(client)
        case None =>
      }

    case WaitingForAuthorization => sender ! WaitingForAuthorizationResponse(immutable.Set() ++ waiting.keySet)
  }
}

If there are better ways to solve this problem please let me know!

Upvotes: 0

Didier Dupont
Didier Dupont

Reputation: 29538

Isn't reply the problem ? In

case IsAuthorized(client: Client) => reply { ... }

all the code is in argument to reply block, so it is executed (included the receiveWithing) before the reply is actually sent. Which means when you client will process your reply, you will no longer be waiting for its.

In your original code, it should probably be something like

case IsAuthorized(client: Client) =>
  if(ok) reply(AuthorizedReply(client, true))
  else {
     reply(AuthorizedReply(client, false))
     receiveWithin(...)
  }

Upvotes: 0

Related Questions