Reputation: 5246
I'm processing items with threads and queues, but sometimes an exception is thrown for a particular item and so it doesn't get processed.
My idea is to put the offending item back into the queue so it can be processed again and keep putting it back until an exception is no longer thrown.
Despite my intention to reprocess items, this code only processes the queue once:
#assume this queue is immediately filled with items
item_queue = Queue.new
define_method (:processItem) {|item|
begin
#do something with item
#Bad style below: will work in specific exception handling later
rescue Exception => ex
#something happened, so put it back in the queue
item_queue << item
return
end
#more processing here, if 'begin' was successful
}
threads = []
until item_queue.empty?
threads << Thread.new{ processItem(item_queue.pop) }
end
threads.each{|thread| thread.join}
My thinking was that Queue
is threadsafe so it could be used like this - but the results show otherwise.
How can I ensure that all items producing an exception get reprocessed until all items have been successful?
Upvotes: 0
Views: 32
Reputation: 923
Yes Queue
is a threadsafe but the way you use it it's not safe.
item_queue.empty?
might return true
before the thread finish.
Calling Thread.join
inside until item_queue.empty?
will solve the race condition problem but will end up with a program that process one item at a time from the queue.
until item_queue.empty?
Thread.new{ processItem(item_queue.pop) }.join
end
If you want the items in the queue to be processed in multithreading way then you need to predefine how many threads you want ex:
# three threads processing items in the queue
until item_queue.empty?
t1 = Thread.new{ processItem(item_queue.pop) }
t2 = Thread.new{ processItem(item_queue.pop) }
t3 = Thread.new{ processItem(item_queue.pop) }
t1.join 1
t2.join 1
t3.join 1
end
Upvotes: 2