Reputation:
I'm using Rails 5. I have this gem for managing threads ...
gem 'concurrent-ruby'
I notice that if one of my threads throws an error, it is just swallowed and I never find out about it. I tried this in a console
pool = Concurrent::FixedThreadPool.new(1)
# => #<Concurrent::FixedThreadPool:0x007fe3585ab368 @__lock__=#<Thread::Mutex:0x007fe3585ab0c0>, @__condition__=#<Thread::ConditionVariable:0x007fe3585ab098>, @min_length=1, @max_length=1, @idletime=60, @max_queue=0, @fallback_policy=:abort, @auto_terminate=true, @pool=[], @ready=[], @queue=[], @scheduled_task_count=0, @completed_task_count=0, @largest_length=0, @ruby_pid=23932, @gc_interval=30, @next_gc_time=252232.13299, @StopEvent=#<Concurrent::Event:0x007fe3585aaf30 @__lock__=#<Thread::Mutex:0x007fe3585aaeb8>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aae90>, @set=false, @iteration=0>, @StoppedEvent=#<Concurrent::Event:0x007fe3585aadc8 @__lock__=#<Thread::Mutex:0x007fe3585aad78>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aad50>, @set=false, @iteration=0>>
nums.each do |num|
pool.post do
if num == 1
asdfasdf
end
end
end
# => [1, 2, 3]
pool.shutdown # => true
pool.wait_for_termination # => true
I was wondering how, if one of the threads from my pool throws an error, I can throw an exception when all the threads have completed, halting my program. If none of the threads throws an error, then I'm fine to continue with whatever was happening.
Above, you'll notice I intentionally cause a condition that should result in an error, but I never find out about it, because I guess the threadpool is swallowing the output of the exceptions.
Upvotes: 5
Views: 1797
Reputation: 36101
To answer your question - no actual way as the library explicitly silences exceptions and there is no configuration for it.
A possible workaround would be to capture exceptions manually:
error = nil
pool = Concurrent::FixedThreadPool.new(1)
numbers.each do |number|
pool.post do
begin
some_dangerous_action(number)
rescue Exception => e
error = e
raise # still let the gem do its thing
end
end
end
pool.shutdown
pool.wait_for_termination
raise error if error
Upvotes: 3
Reputation: 20390
If you need built-in exception handling, you should use a higher-level abstraction instead of using thread pools directly. Refer to this comment from the author of concurrent-ruby
:
Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.
If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an
:executor
option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.
Here's a variation on your example using the Promise
abstraction. This will re-raise an exception as soon as the thread pool raises one:
require 'concurrent'
pool = Concurrent::FixedThreadPool.new(1)
promises = (1..10).map do |num|
Concurrent::Promise.execute(executor: pool) do
if num == 1
asdfasdf
else
num
end
end
end
promises.map(&:value!)
# NameError: undefined local variable or method `asdfasdf' for main:Object
# from (irb):57:in `block (2 levels) in irb_binding'
# [...]
To re-raise an exception only after all threads have completed (not immediately upon the first exception), you can replace promises.map(&:value!)
with Concurrent::Promise.zip(*promises).value!
.
To store the exception in the collection result without re-raising it, you can do something like promises.map { |p| p.value || p.reason }
:
# => [#<NameError: undefined local variable or method `asdfasdf' for main:Object>, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Finally, note that a fixed thread pool with only 1 thread will execute all tasks sequentially on a single thread. To execute them all in parallel (on a pool with 10 threads), change the thread-pool initializer to pool = Concurrent::FixedThreadPool.new(10)
.
Upvotes: 3