maasha
maasha

Reputation: 1995

Ruby: Synchronizing fork pool output

I am trying to create a generic way of iterating Enumerables using multiple processors. I am spawning a given number of workers using fork, and feeding them data to process reusing idle workers. However, I would like to synchronize the input and output order. If job 1 and job 2 are started simultaneously and job 2 is completed before job 1, then the result order is out of sync. I would like to cache the output on the fly somehow to synchronize the output order, but I fail to see how this can be done?

#!/usr/bin/env ruby

require 'pp'

DEBUG = false
CPUS  = 2

module Enumerable
  # Fork each (feach) creates a fork pool with a specified number of processes
  # to iterate over the Enumerable object processing the specified  block.
  # Calling feach with :processes => 0 disables forking for debugging purposes.
  # It is possible to disable synchronized output with :synchronize => false
  # which will save some overhead.
  #
  # @example - process 10 elements using 4 processes:
  #
  # (0 ... 10).feach(:processes => 4) { |i| puts i; sleep 1 }
  def feach(options = {}, &block)
    $stderr.puts "Parent pid: #{Process.pid}" if DEBUG

    procs = options[:processes]   || 0
    sync  = options[:synchronize] || true

    if procs > 0
      workers = spawn_workers(procs, &block)
      threads = []

      self.each_with_index do |elem, index|
        $stderr.puts "elem: #{elem}    index: #{index}" if DEBUG

        threads << Thread.new do 
          worker = workers[index % procs]
          worker.process(elem)
        end

        if threads.size == procs
          threads.each { |thread| thread.join }
          threads = []
        end
      end

      threads.each { |thread| thread.join }
      workers.each { |worker| worker.terminate }
    else
      self.each do |elem|
        block.call(elem)
      end
    end
  end

  def spawn_workers(procs, &block)
    workers = []

    procs.times do 
      child_read, parent_write = IO.pipe
      parent_read, child_write = IO.pipe

      pid = Process.fork do
        begin
          parent_write.close
          parent_read.close
          call(child_read, child_write, &block)
        ensure
          child_read.close
          child_write.close
        end
      end

      child_read.close
      child_write.close

      $stderr.puts "Spawning worker with pid: #{pid}" if DEBUG

      workers << Worker.new(parent_read, parent_write, pid)
    end

    workers
  end

  def call(child_read, child_write, &block)
    while not child_read.eof?
      elem = Marshal.load(child_read)
      $stderr.puts "      call with Process.pid: #{Process.pid}" if DEBUG
      result = block.call(elem)
      Marshal.dump(result, child_write)
    end
  end

  class Worker
    attr_reader :parent_read, :parent_write, :pid

    def initialize(parent_read, parent_write, pid)
      @parent_read  = parent_read
      @parent_write = parent_write
      @pid          = pid
    end

    def process(elem)
      Marshal.dump(elem, @parent_write)
      $stderr.puts "   process with worker pid: #{@pid} and parent pid: #{Process.pid}" if DEBUG
      Marshal.load(@parent_read)
    end

    def terminate
      $stderr.puts "Terminating worker with pid: #{@pid}" if DEBUG
      Process.wait(@pid, Process::WNOHANG)
      @parent_read.close
      @parent_write.close
    end
  end
end

def fib(n) n < 2 ? n : fib(n-1)+fib(n-2); end # Lousy Fibonacci calculator <- heavy job

(0 ... 10).feach(processes: CPUS) { |i| puts "#{i}: #{fib(35)}" }

Upvotes: 2

Views: 615

Answers (1)

There is no way to sync the output unless you force all the child processes to send their output to the parent and have it sort the results, or you enforce some kind of I/O locking between processes.

Without knowing what your long term goal is it's difficult to suggest a solution. In general, you'll need a lot of work in each process to gain any signficant speedup using fork and there is not a simple way to get results back to the main program.

Native Threads( pthreads on linux) might make more sense to accomplish what you are trying to do, however not all versions of Ruby support threads at that level. See :

Does ruby have real multithreading?

Upvotes: 1

Related Questions