Zubatman
Zubatman

Reputation: 1305

Ruby Thread Pooling - What am I doing wrong?

I have 2.5 million records in the Content table of my Postgres Database, I need to go through each of those 2.5 million records and perform a number of actions (many of which are slow by themselves) and update the record at the end based on what I have gathered along the way - that all works - the problem, it takes for bloody ever to run.

I came across a couple of articles talking about multithreading such jobs (I have done this before in C, but never Ruby) and the pros and cons of using threads in Ruby, still despite those cons, the 2000 threads I can get off happen significantly faster than running without threading, but I can only get 2000 off at one time, limiting me from actually being able to update all 2.5 million records. Here is the code I had for that:

Content.all.each do |content|
  threads << Thread.new do
    grab_and_store(content)
  end
  index += 1
  index % 100 == 0 ? (puts index) : nil
end
threads.map(&:join)

I also read about thread pooling, using the same threads to do other jobs once they have completed their original one, but I can't seem to get it to work. Here is the code that I had:

POOL_SIZE = 1000

jobs = Queue.new
Content.all.each{ |x| jobs.push x }

workers = (POOL_SIZE).times.map do
  Thread.new do
    begin
      while x = jobs.pop(true)
        grab_and_store(x)
      end
    rescue ThreadError
    end
  end
end
workers.map(&:join)

When I run this I get an error that I can't execute .join on a nil class, which would mean that workers is nil at the end of this. But when I take the code that I based this off of (shown below, and source) and run that it works perfectly. I can't seem to figure out where mine is breaking / how to best implement the thread pool to stop my code from running out of resources after 2000 threads.

Thanks!

P.S. Heres the code from the tutorial I used:

require 'thread'
work_q = Queue.new
(0..50).to_a.each{|x| work_q.push x }
workers = (0...4).map do
  Thread.new do
    begin
      while x = work_q.pop(true)
        50.times{print [128000+x].pack "U*"}
      end
    rescue ThreadError
    end
  end
end; "ok"
workers.map(&:join); "ok"

Update:

Per Anthony's answer I found myself with the following chunk of code, using the ruby-thread gem he recommended, it runs through the given Content really quickly (it's a sample size of 1000), but when I check console it appears to have only saved around 20 max. Here's the code:

pool = Thread.pool(5)

@ids = []
arr = Content.where(needs_update: true)[0...1000]

puts "Starting With Sample 1000"

arr.each do |content|
  pool.process do
    grab_and_store(content)
  end
  index += 1
  index % 100 == 0 ? (puts index) : nil
end

pool.shutdown

Upvotes: 4

Views: 8298

Answers (1)

Anthony
Anthony

Reputation: 15957

I've used the ruby-thread gem which add's pool support like so:

require 'thread/pool'

pool = Thread.pool(50)

Content.all.each do |content|
  pool.process do
    grab_and_store(content)
  end
end

pool.shutdown

Upvotes: 10

Related Questions