Geo
Geo

Reputation: 96947

How should I handle this use case using EventMachine?

I have an application that reacts to messages sent by clients. One message is reload_credentials, that the application receives any time a new client registers. This message will then connect to a PostgreSQL database, do a query for all the credentials, and then store them in a regular Ruby hash ( client_id => client_token ).

Some other messages that the application may receive are start,stop,pause which are used to keep track of some session times. My point is that I envision the application functioning in the following way:

However, for example, I don't want to block the reactor. Furthermore, let's imagine I have a reload_credentials message that's next in queue. I don't want any other message from the queue to be processed until the credentials are reloaded from the DB. Also, while I am processing a certain message ( like waiting for the credentials query to finish) , I want to allow other messages to be enqueued .

Could you please guide me towards solving such a problem? I'm thinking I may have to use em-synchrony, but I am not sure.

Upvotes: 8

Views: 520

Answers (2)

raggi
raggi

Reputation: 1297

The following is I presume, something like your current implementation:

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end


    q = EM::Queue.new

    workers = Array.new(10) { Worker.new q }

The problem above, if I understand you correctly, is that you don't want workers working on new jobs (jobs that have arrived earlier in the producer timeline), than any reload_credentials jobs. The following should service this (additional words of caution at the end).

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end

    class LockingDispatcher
      def initialize channel, queue
        @channel = channel
        @queue = queue

        @backlog = []
        @channel.subscribe method(:dispatch_with_locking)

        @locked = false
      end

      def dispatch_with_locking item
        if locked?
          @backlog << item
        else
          # You probably want to move the specialization here out into a method or
          # block that's passed into the constructor, to make the lockingdispatcher
          # more of a generic processor
          case item.type
          when :reload_credentials
            lock
            deferrable = CredentialReloader.new(item).start
            deferrable.callback { unlock }
            deferrable.errback  { unlock }
          else
            dispatch_without_locking item
          end
        end
      end

      def dispatch_without_locking item
        @queue << item
      end

      def locked?
        @locked
      end

      def lock
        @locked = true
      end

      def unlock
        @locked = false
        bl = @backlog.dup
        @backlog.clear
        bl.each { |item| dispatch_with_locking item }
      end

    end

    channel = EM::Channel.new
    queue = EM::Queue.new

    dispatcher = LockingDispatcher.new channel, queue

    workers = Array.new(10) { Worker.new queue }

So, input to the first system comes in on q, but in this new system it comes in on channel. The queue is still used for work distribution among workers, but the queue is not populated while a refresh credentials operation is going on. Unfortunately, as I didn't take more time, I have not generalized the LockingDispatcher such that it isn't coupled with the item type and code for dispatching CredentialsReloader. I'll leave that to you.

You should note here that whilst this services what I understand of your original request, it is generally better to relax this kind of requirement. There are several outstanding problems that essentially cannot be eradicated without alterations in that requirement:

  • The system does not wait for executing jobs to complete before starting credentials jobs
  • The system will handle bursts of credentials jobs very badly - other items that might be processable, won't be.
  • In the case of a bug in the credentials code, the backlog could fill up ram and cause failure. A simple timeout might be enough to avoid catastrophic effects, iff the code is abortable, and subsequent messages are sufficiently processable to avoid further deadlocks.

It actually sounds like you have some notion of a userid in the system. If you think through your requirements, it's likely possible that you only need to backlog items that pertain to a userid who's credentials are in a refresh state. This is a different problem, that involves a different kind of dispatching. Try a hash of locked backlogs for those users, with a callback on credential completion to drain those backlogs into the workers, or some similar arrangement.

Good luck!

Upvotes: 4

simulacre
simulacre

Reputation: 1243

Use one of the Postgresql EM drivers, or EM.defer so that you won't block the reactor.

When you receive the 'reload_credentials' message just flip a flag that causes all subsequent messages to be enqueued. Once the 'reload_credentials' has finished, process all messages from the queue. After the queue is empty flip the flag that causes messages to be processed as they are received.

EM drivers for Postgresql are listed here: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server
  def post_init
    @queue               = []
    @loading_credentials = false
  end

  def recieve_message(type, data)
    return @queue << [type, data] if @loading_credentials || [email protected]?
    return process_msg(type, data) unless :reload_credentials == type
    @loading_credentials = true
    reload_credentials do
      @loading_credentials = false
      process_queue
    end
  end

  def reload_credentials(&when_done)
    EM.defer( proc { query_and_load_credentials }, when_done )
  end


  def process_queue
    while (type, data = @queue.shift)
      process_msg(type, data)
    end
  end

  # lots of other methods
end

EM.start_server(HOST, PORT, Server)

If you want all connections to queue messages whenever any connection receives a 'reload_connections' message you'll have to coordinate via the eigenclass.

Upvotes: 7

Related Questions