Reputation: 582
This has been bothering me for some time -- it seems like something that should totally be doable, but I am stuck.
I have a small Ruby program that is just acting as a go-between. It runs a long (several minutes), blocking action (through an FFI interface), but is supposed to then send periodic updates that it gets from that action via callbacks to the main Meteor app through a DDP connection.
Both components of this program work on their own. Through a system I rolled me own, as well as the metybur gem, I'm able to communicate with the Meteor app. And, if i just use puts
to output the data from the callbacks of the FFI interface, I get those perfectly, too. (Except, for another reason I cannot quite put my finger on, the FFI/blocking action silently fails if its in a Thread.new
block.)
For some reason, however, when I try to send the data through to the Meteor app, nothing happens. ws.send
(on EventMachine) returns true
, though never actually gets called, even if i put it in its own Thread.new
block.
Part of me suspects (though cant figure out how to test it) that the connection is lost because the Ruby app cannot deal with the ping/pong keepalive requests during the blocking.
I've tried EM.spawn
from EventMachine for the blocking process, I've tried launching EventMachine in its own thread, but nothing seems to work.
Curious if there are best-practices for something like this, to be able to keep the EventMachine portion of the app responsive even during a CPU-intensive blocking operations?
Upvotes: 2
Views: 719
Reputation: 19221
EDITED
After our discussion in the comments, I decided to review the code I posted and write a small DDP encapsulation leveraging Iodine's Websocket Client (which I prefer, since I'm the author).
I have to admit, I really had fun thinking about this question. Attached is a simplified code for a Meteor connector using Iodine. This is truly basic and includes only: renewing the connection if dropped, completing the handshake and answering ping-pongs.
To use this code together with the FFI workflow concept initiated in the first answer, use:
# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
Iodine.debug "got message #{message}, it's a Hash"
end
# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
# initialize FFI interface
data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
# imagine it takes time
sleep 1
# Meteor will respond to these with error messages
(meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
sleep 1
Iodine.signal_exit
end
# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }
As to the Meteor DDP connection class, this could probably be achieved like so:
require 'iodine/client'
class IodineDDP
attr_reader :session
attr_reader :server_id
def initialize url, &block
@url = url
@ddp_initialized = false
@session = nil
@server_id = nil
@block = block
@closed = false
connect_websocket
end
def << message
Iodine.debug "Writing message #{message}"
ensure_connection
@ws << message
end
alias :write :<<
def close
@closed = true
@ws.on_close { nil }
@ws.close
end
protected
def on_message data
# make sure the DDP handshake is complete
return handshake data unless @ddp_initialized
data = JSON.parse(data)
Iodine.debug "Got message: #{data}"
return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
return true if data['msg'] == 'pong'
@block.call data
end
def on_close
@ddp_initialized = false
connect_websocket
end
def ensure_connection
return true unless @ws.closed? || !@ddp_initialized
raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
timeout = Iodine.time + 3
sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
end
def connect_websocket
@___on_message_proc ||= method(:on_message)
@___on_close_proc ||= method(:on_close)
@ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
# inform
Iodine.debug "initiating a new DDP connection to #{@url}"
# start the DDP handshake
handshake
end
def handshake last_message = nil
raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
unless last_message # this is the first message sent
Iodine.debug "Meteor DDP handshake initiated."
msg = {msg: "connect", version: "1", support: ["1"]}
msg[:session] = @session if @session
return(@ws << msg.to_json)
end
message = JSON.parse(last_message)
raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
if message['msg'] == 'connected'
# inform
Iodine.debug "Meteor DDP handshake complete."
@session = message['session']
return @ddp_initialized = true
else
return @server_id = message['server_id'] if message['server_id']
Iodine.error "Invalid handshake data - closing connection."
close
end
end
end
# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3
# # if we are inside a larger application, call:
# Iodine.force_start!
# # if we are on irb:
exit
# no need to write anything if this is the whole of the script
Upvotes: 1