Reputation: 6793
My code needs to fire multiple threads and keep track of which have finished and which are still running. I as planning on using waitAny
or waitAnyCatch
, but was thrown off by the following in the documentation
If multiple Asyncs complete or have completed, then the value returned corresponds to the first completed Async in the list.
If that is really the case, how does one ever keep track of running / exited threads reliably?
Here's my simplified code:
chan <- newChan
currentThreadsRef <- newIORef []
-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
jobArgs <- readChan chan
jobAsync <- async $ runJob jobArgs
atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())
-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef
waitForAllJobs currentJobsRef = do
(readIORef currentJobsRef) >>= \case
[] -> logDebug "All jobs exited"
currentJobs -> do
(exitedJob, jobResult) <- waitAnyCatch currentJobs
atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
logDebug $ "Job completed with result=" <> show result
waitForAllJobs currentJobsRef
PS: Although it may not be obvious from my simplified code above, there is a reason why I cannot simply use mapConcurrently
over the input-data. Actually, async-pool
seems like a good fit for my use-case, but even that has the same problem with waitAny
.
Upvotes: 3
Views: 274
Reputation: 50864
Here's a program that launches 1000 asyncs all set to terminate within a second and waits for them all in a loop. Compiled with ghc -O2 -threaded
and run with +RTS -N
, it runs in about 1.5 seconds, and none of the asyncs gets "lost":
import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as Set
main :: IO ()
main = do
let n = 1000 :: Int
asyncs0 <- mapM (\i -> async (threadDelay 1000000 >> return i)) [1..n]
let loop :: Set.Set (Async Int) -> IO ()
loop asyncs | null asyncs = return ()
| otherwise = do
(a, _i) <- waitAny (Set.toList asyncs)
loop (Set.delete a asyncs)
loop (Set.fromList asyncs0)
So, as was mentioned in a comment, the documentation is referring to the fact that the first completed async in the provided list is the one that will be "returned", but if multiple asyncs have completed, the additional ones aren't "forgotten". You just need to remove the returned async from the list and re-poll, and you'll eventually get them all.
So, you shouldn't have any trouble waiting on multiple asyncs with waitAny
.
Upvotes: 5