Reputation: 96947
I have an application that reacts to messages sent by clients. One message is reload_credentials
, that the application receives any time a new client registers. This message will then connect to a PostgreSQL database, do a query for all the credentials, and then store them in a regular Ruby hash ( client_id => client_token ).
Some other messages that the application may receive are start
,stop
,pause
which are used to keep track of some session times. My point is that I envision the application functioning in the following way:
However, for example, I don't want to block the reactor. Furthermore, let's imagine I have a reload_credentials
message that's next in queue. I don't want any other message from the queue to be processed until the credentials are reloaded from the DB. Also, while I am processing a certain message ( like waiting for the credentials query to finish) , I want to allow other messages to be enqueued .
Could you please guide me towards solving such a problem? I'm thinking I may have to use em-synchrony
, but I am not sure.
Upvotes: 8
Views: 520
Reputation: 1297
The following is I presume, something like your current implementation:
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
q = EM::Queue.new
workers = Array.new(10) { Worker.new q }
The problem above, if I understand you correctly, is that you don't want workers working on new jobs (jobs that have arrived earlier in the producer timeline), than any reload_credentials jobs. The following should service this (additional words of caution at the end).
class Worker
def initialize queue
@queue = queue
dequeue
end
def dequeue
@queue.pop do |item|
begin
work_on item
ensure
dequeue
end
end
end
def work_on item
case item.type
when :reload_credentials
# magic happens here
else
# more magic happens here
end
end
end
class LockingDispatcher
def initialize channel, queue
@channel = channel
@queue = queue
@backlog = []
@channel.subscribe method(:dispatch_with_locking)
@locked = false
end
def dispatch_with_locking item
if locked?
@backlog << item
else
# You probably want to move the specialization here out into a method or
# block that's passed into the constructor, to make the lockingdispatcher
# more of a generic processor
case item.type
when :reload_credentials
lock
deferrable = CredentialReloader.new(item).start
deferrable.callback { unlock }
deferrable.errback { unlock }
else
dispatch_without_locking item
end
end
end
def dispatch_without_locking item
@queue << item
end
def locked?
@locked
end
def lock
@locked = true
end
def unlock
@locked = false
bl = @backlog.dup
@backlog.clear
bl.each { |item| dispatch_with_locking item }
end
end
channel = EM::Channel.new
queue = EM::Queue.new
dispatcher = LockingDispatcher.new channel, queue
workers = Array.new(10) { Worker.new queue }
So, input to the first system comes in on q
, but in this new system it comes in on channel
. The queue
is still used for work distribution among workers, but the queue
is not populated while a refresh credentials operation is going on. Unfortunately, as I didn't take more time, I have not generalized the LockingDispatcher
such that it isn't coupled with the item type and code for dispatching CredentialsReloader
. I'll leave that to you.
You should note here that whilst this services what I understand of your original request, it is generally better to relax this kind of requirement. There are several outstanding problems that essentially cannot be eradicated without alterations in that requirement:
It actually sounds like you have some notion of a userid in the system. If you think through your requirements, it's likely possible that you only need to backlog items that pertain to a userid who's credentials are in a refresh state. This is a different problem, that involves a different kind of dispatching. Try a hash of locked backlogs for those users, with a callback on credential completion to drain those backlogs into the workers, or some similar arrangement.
Good luck!
Upvotes: 4
Reputation: 1243
Use one of the Postgresql EM drivers, or EM.defer so that you won't block the reactor.
When you receive the 'reload_credentials' message just flip a flag that causes all subsequent messages to be enqueued. Once the 'reload_credentials' has finished, process all messages from the queue. After the queue is empty flip the flag that causes messages to be processed as they are received.
EM drivers for Postgresql are listed here: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations
module Server
def post_init
@queue = []
@loading_credentials = false
end
def recieve_message(type, data)
return @queue << [type, data] if @loading_credentials || [email protected]?
return process_msg(type, data) unless :reload_credentials == type
@loading_credentials = true
reload_credentials do
@loading_credentials = false
process_queue
end
end
def reload_credentials(&when_done)
EM.defer( proc { query_and_load_credentials }, when_done )
end
def process_queue
while (type, data = @queue.shift)
process_msg(type, data)
end
end
# lots of other methods
end
EM.start_server(HOST, PORT, Server)
If you want all connections to queue messages whenever any connection receives a 'reload_connections' message you'll have to coordinate via the eigenclass.
Upvotes: 7