Benedikt B
Benedikt B

Reputation: 753

Ruby 1.8.7: Forks & Pipes - Troubleshooting

I'm aware that there are great gems like Parallel, but I came up with the class below as an exercise.

It's working fine, but when doing a lot of iterations it happens sometimes that Ruby will get "stuck". When pressing CTRL+C I can see from the backtrace that it's always in lines 38 or 45 (the both Marshal lines). Can you see anything that is wrong here? It seems to be that the Pipes are "hanging", so I thought I might be using them in a wrong way.

My goal was to iterate through an array (which I pass as 'objects') with a limited number of forks (max_forks) and to return some values. Additionally I wanted to guarantee that all childs get killed when the parent gets killed (even in case of kill -9), this is why I introduced the "life_line" Pipe (I've read here on Stackoverflow that this might do the trick).

class Parallel

  def self.do_fork(max_forks, objects)
    waiter_threads = []
    fork_counter = []

    life_line = {}
    comm_line = {}

    objects.each do |object|
      key = rand(24 ** 24).to_s(36)

      sleep(0.01) while fork_counter.size >= max_forks

      if fork_counter.size < max_forks
        fork_counter << true

        life_line[key] = {}
        life_line[key][:r], life_line[key][:w] = IO.pipe

        comm_line[key] = {}
        comm_line[key][:r], comm_line[key][:w] = IO.pipe

        pid = fork {
          life_line[key][:w].close
          comm_line[key][:r].close

          Thread.new {
            begin
              life_line[key][:r].read
            rescue SignalException, SystemExit => e
              raise e
            rescue Exception => e
              Kernel.exit
            end
          }

          Marshal.dump(yield(object), comm_line[key][:w]) # return yield
        }

        waiter_threads << Thread.new {
          Process.wait(pid)

          comm_line[key][:w].close
          reply = Marshal.load(comm_line[key][:r])
          # process reply here
          comm_line[key][:r].close

          life_line[key][:r].close
          life_line[key][:w].close
          life_line[key] = nil

          fork_counter.pop 
        }
      end
    end

    waiter_threads.each { |k| k.join } # wait for all threads to finish
  end
end

Upvotes: 0

Views: 478

Answers (2)

Benedikt B
Benedikt B

Reputation: 753

The bug was this:

A pipe can handle only a certain amount of data (e.g. 64 KB). Once you write more than that, the Pipe will get "stuck" forever.

An easy solution is to read the pipe in a thread before you start writing to it.

comm_line = IO.pipe

# Buffered Pipe Reading (in case bigger than 64 KB)
reply = ""
read_buffer = Thread.new {
  while !comm_line[0].eof?
    reply = Marshal.load(comm_line[0])
  end
}

child_pid = fork {
  comm_line[0].close
  comm_line[0].write "HUGE DATA LARGER THAN 64 KB"
}

Process.wait(child_pid)

comm_line[1].close
read_buffer.join
comm_line[0].close

puts reply # outputs the "HUGE DATA"

Upvotes: 1

Niels Ganser
Niels Ganser

Reputation: 2380

I don't think the problem is with Marshal. The more obvious one seems to be that your fork may finish execution before the waiter thread gets to it (leading to the latter to wait forever).

Try changing Process.wait(pid) to Process.wait(pid, Process::WNOHANG). The Process::WNOHANG flag instructs Ruby to not hang if there are no children (matching the given PID, if any) available. Note that this may not be available on all platforms but at the very least should work on Linux.

There's a number of other potential problems with your code but if you just came up with it "as an exercise", they probably don't matter. For example, Marshal.load does not like to encounter EOFs, so I'd probably guard against those by saying something like Marshal.load(comm_line[key][:r]) unless comm_line[key][:r].eof? or loop until comm_line[key][:r].eof? if you expect there to be several objects to be read.

Upvotes: 0

Related Questions