JP.
JP.

Reputation: 5594

Threading in Ruby with a limit

I have a task I need to perform, do_stuff(opts), that will take ~1s each, even while 1 - 10 of them are running in parallel. I need to collect an array of the results for each operation at the end.

If I have 30 stuffs to do, how would I use threading effectively to queue up the do_stuff(opts) operations so no more than 10 are running concurrently, but the array of results is not given/printed/etc until all (30) tasks have been completed?

I usually have at least some code to try and illustrate what I mean, but with threading I'm at a bit of a loss! Thanks in advance

Upvotes: 6

Views: 4874

Answers (6)

jkndrkn
jkndrkn

Reputation: 4062

Also, take a look at this tutorial if you are new to Ruby threads.

Upvotes: 1

Dorian
Dorian

Reputation: 23989

I use parals and paralsmap:

def parals(objects, n: 50)
  objects.shuffle.each_slice(n).map do |g|
    print '{'
    threads = []
    g.map { |i| threads << Thread.new(i) { |i| yield(i) } }
    threads.each(&:join)
    print '}'
  end
end

def paralsmap(objects, n: 50)
  res = []

  objects.each_slice(n).map do |g|
    print '{'
    threads = []
    g.map { |i| threads << Thread.new(i, res) { |i| res << yield(i) } }
    threads.each(&:join)
    print '}'
  end

  res
end

e.g.:

parals((0..100).to_a) { |i| puts i }
urls = parals((0..100).to_a) { |i| "https://google.com/?q=#{i}" }

You can use the n parameter to limit the number of threads.

Upvotes: 0

Darren Weber
Darren Weber

Reputation: 1674

This solution gathers results in $results array. It allows 'thread_limit' threads to be created, then waits on them to complete before creating any more.

   $results = []

   def do_stuff(opts={})
     'done'
   end

   def thread_wait(threads)
     threads.each{|t| t.join}
     threads.each {|t| $results << t }
     threads.delete_if {|t| t.status == false}
     threads.delete_if {|t| t.status.nil? }
   end

   opts = {}
   thread_limit = 20
   threads = []
   records.each do |r|
     thread_wait(threads) while threads.length >= thread_limit
     t = Thread.new { do_stuff(opts) }
     t.abort_on_exception = true
     threads << t
   end
   # Ensure remaining threads complete
   threads.each{|t| t.join}

Upvotes: 1

Nat
Nat

Reputation: 3077

I dunno how well it'll work for a more complex application, but I found something like this to work nicely for a simple threading scenario with macruby.

thread_limit = 4

threads = []
things_to_process.each do |thing|
  until threads.map { |t| t.status }.count("run") < thread_limit do sleep 5 end
  threads << Thread.new { the_task(thing) }
end
output = threads.map { |t| t.value }

the until loop waits around until there are less than the specified number of created threads running before allowing execution of the main thread to continue on to start the next thread.

the output variable will be assigned an array of the values returned by the_task with an ordering corresponding to the input array things_to_process. The main thread will block until every created thread returns a value.

Upvotes: 6

Marc Seeger
Marc Seeger

Reputation: 2737

If you're really after performance, you might also want to look into jruby.
It uses actual OS threads and not the green threads the other ruby implementations use

Upvotes: 1

Aditya Mukherji
Aditya Mukherji

Reputation: 9256

You need to implement this pattern
This question discusses how that can be done in Ruby

Upvotes: 4

Related Questions