Reputation: 47609
I have an application that spins up a number of futures to do prolonged work. It's intermittently failing and I'm trying to work out why.
The symptom is that the code is just ceasing to execute, and stops in a random place. My future-creation-code is something like this:
(def future-timeout
; 1 hour
3600000)
(def concurrency 200)
(defn do-parallel
[f coll]
(let [chunks (partition-all concurrency coll)]
(doseq [chunk chunks]
(let [futures (doall
(map #(future
(try
(f %)
(catch Exception e
(log/error "Unhandled error in do-parallel:" (.getMessage e))
:exception)))
chunk))
results (doall (map #(deref % future-timeout :timeout) futures))
all-ok (every? true? results)]
(when all-ok
(log/info "Chunk successful."))
(when-not all-ok
(log/error "Chunk unsuccessful.")
(log/warn "Parallel execution results:" results))
(swap! chunk-count inc)))
(log/info "Finished batch")))
The concurrency variable controls the size of batches, and therefore the number of concurrent executions it attempts. f
returns true
on success. If there's a timeout or exception, they return :timeout
or :exception
.
I'm doing this instead of pmap
because I want to control concurrency, f
is a long-running (~10 minutes), network-intensive task. pmap
seems to be tuned toward mid-sized, smaller batches.
Normally this works fine. But after a few hours it stops:
f
, the function stops running.do-parallel
stops and no more log entries appear.Any ideas of what might be causing this? Or steps to put in place to help diagnose?
Upvotes: 1
Views: 315
Reputation: 89
Maybe try catch Throwable instead of Exception? I've had issues before that slipped through catch Exception because of it.
I think if there's an uncaught exception in the futures, it catches it and dies without throwing it further out, so setting the default uncaughtexception isn't going to help. Untested - but that's my gut feel.
Do you get the "Chunk unsuccessful" message at the end at least, when it stops? because if you don't, then that's really weird...
Looking around the implementation of future
- it uses a cachedthreadpool underneath - which doesn't have a thread limit, so you're probably better off using the ExecutorService directly, or something like claypoole, like the other suggestions indicate.
Upvotes: 0
Reputation: 10662
Claypool is useful for controlling parallelism.
(cp/pmap (count chunk) f chunk)
will create a temporary threadpool the same size as your chunk and execute all the functions in parallel.
This is just a suggestion for expressing parallelism, not an answer to your question which is about error handling; which I'm curious about also!
Upvotes: 0
Reputation: 49
You might want to try to install an uncaught exception handler, to see if a stray exception on the Executor
itself is causing work to stop.
https://github.com/pyr/uncaught has a facility for this, but it's also straightforward to do from the code directly.
Upvotes: 1