user7055375
user7055375

Reputation:

How do I catch an error from a thread and then re-throw that error when all the threads have completed?

I'm using Rails 5. I have this gem for managing threads ...

gem 'concurrent-ruby'

I notice that if one of my threads throws an error, it is just swallowed and I never find out about it. I tried this in a console

pool = Concurrent::FixedThreadPool.new(1)
  # => #<Concurrent::FixedThreadPool:0x007fe3585ab368 @__lock__=#<Thread::Mutex:0x007fe3585ab0c0>, @__condition__=#<Thread::ConditionVariable:0x007fe3585ab098>, @min_length=1, @max_length=1, @idletime=60, @max_queue=0, @fallback_policy=:abort, @auto_terminate=true, @pool=[], @ready=[], @queue=[], @scheduled_task_count=0, @completed_task_count=0, @largest_length=0, @ruby_pid=23932, @gc_interval=30, @next_gc_time=252232.13299, @StopEvent=#<Concurrent::Event:0x007fe3585aaf30 @__lock__=#<Thread::Mutex:0x007fe3585aaeb8>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aae90>, @set=false, @iteration=0>, @StoppedEvent=#<Concurrent::Event:0x007fe3585aadc8 @__lock__=#<Thread::Mutex:0x007fe3585aad78>, @__condition__=#<Thread::ConditionVariable:0x007fe3585aad50>, @set=false, @iteration=0>> 
nums.each do |num|
  pool.post do
    if num == 1
      asdfasdf
    end
  end
end
  # => [1, 2, 3] 
pool.shutdown             # => true 
pool.wait_for_termination # => true 

I was wondering how, if one of the threads from my pool throws an error, I can throw an exception when all the threads have completed, halting my program. If none of the threads throws an error, then I'm fine to continue with whatever was happening.

Above, you'll notice I intentionally cause a condition that should result in an error, but I never find out about it, because I guess the threadpool is swallowing the output of the exceptions.

Upvotes: 5

Views: 1797

Answers (2)

ndnenkov
ndnenkov

Reputation: 36101

To answer your question - no actual way as the library explicitly silences exceptions and there is no configuration for it.

A possible workaround would be to capture exceptions manually:

error = nil
pool = Concurrent::FixedThreadPool.new(1)
numbers.each do |number|
  pool.post do
    begin
      some_dangerous_action(number)
    rescue Exception => e
      error = e
      raise # still let the gem do its thing
    end
  end
end

pool.shutdown
pool.wait_for_termination

raise error if error

Upvotes: 3

wjordan
wjordan

Reputation: 20390

If you need built-in exception handling, you should use a higher-level abstraction instead of using thread pools directly. Refer to this comment from the author of concurrent-ruby:

Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.

Here's a variation on your example using the Promise abstraction. This will re-raise an exception as soon as the thread pool raises one:

require 'concurrent'
pool = Concurrent::FixedThreadPool.new(1)
promises = (1..10).map do |num|
  Concurrent::Promise.execute(executor: pool) do
    if num == 1
      asdfasdf
    else
      num
    end
  end
end
promises.map(&:value!)

# NameError: undefined local variable or method `asdfasdf' for main:Object
#     from (irb):57:in `block (2 levels) in irb_binding'
#     [...]

To re-raise an exception only after all threads have completed (not immediately upon the first exception), you can replace promises.map(&:value!) with Concurrent::Promise.zip(*promises).value!.

To store the exception in the collection result without re-raising it, you can do something like promises.map { |p| p.value || p.reason }:

# => [#<NameError: undefined local variable or method `asdfasdf' for main:Object>, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Finally, note that a fixed thread pool with only 1 thread will execute all tasks sequentially on a single thread. To execute them all in parallel (on a pool with 10 threads), change the thread-pool initializer to pool = Concurrent::FixedThreadPool.new(10).

Upvotes: 3

Related Questions