kraftydevil
kraftydevil

Reputation: 5246

How to reprocess items in threads throwing exceptions in ruby?

I'm processing items with threads and queues, but sometimes an exception is thrown for a particular item and so it doesn't get processed.

My idea is to put the offending item back into the queue so it can be processed again and keep putting it back until an exception is no longer thrown.

Despite my intention to reprocess items, this code only processes the queue once:

#assume this queue is immediately filled with items
item_queue = Queue.new 

define_method (:processItem) {|item|
  begin
    #do something with item
  #Bad style below: will work in specific exception handling later
  rescue Exception => ex 
    #something happened, so put it back in the queue
    item_queue << item
    return
  end
  #more processing here, if 'begin' was successful
}

threads = []    

until item_queue.empty?
  threads << Thread.new{ processItem(item_queue.pop) }
end

threads.each{|thread| thread.join}

My thinking was that Queue is threadsafe so it could be used like this - but the results show otherwise.

How can I ensure that all items producing an exception get reprocessed until all items have been successful?

Upvotes: 0

Views: 32

Answers (1)

Abdulrazak Alkl
Abdulrazak Alkl

Reputation: 923

Yes Queue is a threadsafe but the way you use it it's not safe.

item_queue.empty? might return true before the thread finish.

Calling Thread.join inside until item_queue.empty? will solve the race condition problem but will end up with a program that process one item at a time from the queue.

until item_queue.empty?
  Thread.new{ processItem(item_queue.pop) }.join
end

If you want the items in the queue to be processed in multithreading way then you need to predefine how many threads you want ex:

# three threads processing items in the queue
until item_queue.empty?
  t1 = Thread.new{ processItem(item_queue.pop) }
  t2 = Thread.new{ processItem(item_queue.pop) }
  t3 = Thread.new{ processItem(item_queue.pop) }
  t1.join 1
  t2.join 1
  t3.join 1
end

Upvotes: 2

Related Questions