Saurabh Nanda
Saurabh Nanda

Reputation: 6793

Waiting on multiple async's reliably?

My code needs to fire multiple threads and keep track of which have finished and which are still running. I as planning on using waitAny or waitAnyCatch, but was thrown off by the following in the documentation

If multiple Asyncs complete or have completed, then the value returned corresponds to the first completed Async in the list.

If that is really the case, how does one ever keep track of running / exited threads reliably?

Here's my simplified code:

chan <- newChan
currentThreadsRef <- newIORef []

-- read jobs from a channel, and run them in parallel asyncs/threads,
-- while adding all threads references to currentThreadsRef
async $ do
  jobArgs <- readChan chan
  jobAsync <- async $ runJob jobArgs
  atomicallyModifyIORef' currentThreadsRef $ \x -> (jobAsync:x, ())

-- wait for jobs to be finished, and remove the thread refernece
-- from currentThreadsRef 
waitForAllJobs currentJobsRef = do
  (readIORef currentJobsRef) >>= \case
    [] -> logDebug "All jobs exited"
    currentJobs -> do
      (exitedJob, jobResult) <- waitAnyCatch currentJobs
      atomicallyModifyIORef currentJobsRef $ \x -> (filter (/= exitedjob) x, ())
      logDebug $ "Job completed with result=" <> show result
      waitForAllJobs currentJobsRef

PS: Although it may not be obvious from my simplified code above, there is a reason why I cannot simply use mapConcurrently over the input-data. Actually, async-pool seems like a good fit for my use-case, but even that has the same problem with waitAny.

Upvotes: 3

Views: 274

Answers (1)

K. A. Buhr
K. A. Buhr

Reputation: 50864

Here's a program that launches 1000 asyncs all set to terminate within a second and waits for them all in a loop. Compiled with ghc -O2 -threaded and run with +RTS -N, it runs in about 1.5 seconds, and none of the asyncs gets "lost":

import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as Set

main :: IO ()
main = do
  let n = 1000 :: Int
  asyncs0 <- mapM (\i -> async (threadDelay 1000000 >> return i)) [1..n]
  let loop :: Set.Set (Async Int) -> IO ()
      loop asyncs | null asyncs = return ()
                  | otherwise = do
                      (a, _i) <- waitAny (Set.toList asyncs)
                      loop (Set.delete a asyncs)
  loop (Set.fromList asyncs0)

So, as was mentioned in a comment, the documentation is referring to the fact that the first completed async in the provided list is the one that will be "returned", but if multiple asyncs have completed, the additional ones aren't "forgotten". You just need to remove the returned async from the list and re-poll, and you'll eventually get them all.

So, you shouldn't have any trouble waiting on multiple asyncs with waitAny.

Upvotes: 5

Related Questions