Jared
Jared

Reputation: 582

Ruby Threads and Websockets

This has been bothering me for some time -- it seems like something that should totally be doable, but I am stuck.

I have a small Ruby program that is just acting as a go-between. It runs a long (several minutes), blocking action (through an FFI interface), but is supposed to then send periodic updates that it gets from that action via callbacks to the main Meteor app through a DDP connection.

Both components of this program work on their own. Through a system I rolled me own, as well as the metybur gem, I'm able to communicate with the Meteor app. And, if i just use puts to output the data from the callbacks of the FFI interface, I get those perfectly, too. (Except, for another reason I cannot quite put my finger on, the FFI/blocking action silently fails if its in a Thread.new block.)

For some reason, however, when I try to send the data through to the Meteor app, nothing happens. ws.send (on EventMachine) returns true, though never actually gets called, even if i put it in its own Thread.new block.

Part of me suspects (though cant figure out how to test it) that the connection is lost because the Ruby app cannot deal with the ping/pong keepalive requests during the blocking.

I've tried EM.spawn from EventMachine for the blocking process, I've tried launching EventMachine in its own thread, but nothing seems to work.

Curious if there are best-practices for something like this, to be able to keep the EventMachine portion of the app responsive even during a CPU-intensive blocking operations?

Upvotes: 2

Views: 719

Answers (1)

Myst
Myst

Reputation: 19221

EDITED

After our discussion in the comments, I decided to review the code I posted and write a small DDP encapsulation leveraging Iodine's Websocket Client (which I prefer, since I'm the author).

I have to admit, I really had fun thinking about this question. Attached is a simplified code for a Meteor connector using Iodine. This is truly basic and includes only: renewing the connection if dropped, completing the handshake and answering ping-pongs.

To use this code together with the FFI workflow concept initiated in the first answer, use:

# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
    Iodine.debug "got message #{message}, it's a Hash"
end

# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
   # initialize FFI interface
   data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
   # imagine it takes time
   sleep 1
   # Meteor will respond to these with error messages 
   (meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
   sleep 1
   Iodine.signal_exit
end

# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }

As to the Meteor DDP connection class, this could probably be achieved like so:

require 'iodine/client'

class IodineDDP
   attr_reader :session
   attr_reader :server_id   
   def initialize url, &block
      @url = url
      @ddp_initialized = false
      @session = nil
      @server_id = nil
      @block = block
      @closed = false
      connect_websocket
   end

   def << message
      Iodine.debug "Writing message #{message}"
      ensure_connection
      @ws << message
   end
   alias :write :<<

   def close
      @closed = true
      @ws.on_close { nil }
      @ws.close
   end

   protected

   def on_message data
      # make sure the DDP handshake is complete
      return handshake data unless @ddp_initialized
      data = JSON.parse(data)
      Iodine.debug "Got message: #{data}"
      return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
      return true if data['msg'] == 'pong'
      @block.call data
   end
   def on_close
      @ddp_initialized = false
      connect_websocket
   end

   def ensure_connection
      return true unless @ws.closed? || !@ddp_initialized
      raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
      raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
      timeout = Iodine.time + 3
      sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
      raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
   end

   def connect_websocket
      @___on_message_proc ||= method(:on_message)
      @___on_close_proc ||= method(:on_close)
      @ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
      # inform
      Iodine.debug "initiating a new DDP connection to #{@url}"
      # start the DDP handshake
      handshake
   end
   def handshake last_message = nil
      raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
      unless last_message # this is the first message sent
         Iodine.debug "Meteor DDP handshake initiated."
         msg = {msg: "connect", version: "1", support: ["1"]}
         msg[:session] = @session if @session
         return(@ws << msg.to_json)
      end
      message = JSON.parse(last_message)
      raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
      if message['msg'] == 'connected'
         # inform
         Iodine.debug "Meteor DDP handshake complete."
         @session = message['session']
         return @ddp_initialized = true
      else
         return @server_id = message['server_id'] if message['server_id'] 
         Iodine.error "Invalid handshake data - closing connection."
         close
      end      
   end
end

# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3

# # if we are inside a larger application, call: 
# Iodine.force_start! 

# # if we are on irb:
exit
# no need to write anything if this is the whole of the script

Upvotes: 1

Related Questions