Damian Nadales
Damian Nadales

Reputation: 5037

Collecting the Async results as they become available

How would you collect the results of a list of Async a in Haskell as they become available? The idea is to start processing the results of asynchronous tasks as soon as they are available.

The best I could come up with is the following function:

collect :: [Async a] -> IO [a]
collect [] = return []
collect asyncs = do
  (a, r) <- waitAny asyncs
  rs <- collect (filter (/= a) asyncs)
  return (r:rs)

However, this function does not exhibits the desired behavior since, as pointed out in the comment below, it doesn't return till all the asynchronous tasks are completed. Furthermore, collect runs in O(n^2) since I'm filtering the list at each recursive step. This could be improved by using a more efficient structure (and maybe indexing the position of the Async values in the list).

Maybe there are library functions that take care of this, but I could not find them in the Control.Concurrent.Async module and I wonder why.

EDIT: after thinking the problem a bit more carefully, I'm wondering whether such function is a good idea. I could just use fmap on the asynchronous tasks. Maybe it is a better practice to wait for the results when there is no other choice.

Upvotes: 2

Views: 201

Answers (3)

Kostia R
Kostia R

Reputation: 2565

Implemented via TChan, additionally implemented a version which can react immediately, but it is more complex and also might have problems with exceptions (if you want to receive exceptions, use SlaveThread.fork instead of forkIO), so I commented that code in case you're not interested in it:

import           Control.Concurrent       (threadDelay)
import           Control.Concurrent       (forkIO)
import           Control.Concurrent.Async
import           Control.Concurrent.STM
import           Control.Monad

collect :: [Async a] -> IO [a]
collect = atomically . collectSTM

collectSTM :: [Async a] -> STM [a]
collectSTM as = do
    c <- newTChan
    collectSTMChan c as

collectSTMChan :: TChan a -> [Async a] -> STM [a]
collectSTMChan chan as = do
    mapM_ (waitSTM >=> writeTChan chan) as
    replicateM (length as) (readTChan chan)

main :: IO ()
main = do
    a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2)
    a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3)
    a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1)
    res <- collect [a1,a2,a3]
    putStrLn (show res)

    -- -- reacting immediately
    -- a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2)
    -- a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3)
    -- a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1)
    -- c <- collectChan [a1,a2,a3]
    -- replicateM_ 3 (atomically (readTChan c) >>= \v -> putStrLn ("Received: " ++ show v))

-- collectChan :: [Async a] -> IO (TChan a)
-- collectChan as = do
--     c <- newTChanIO
--     forM_ as $ \a -> forkIO ((atomically . (waitSTM >=> writeTChan c)) a)
--     return c

Upvotes: 1

Benjamin Hodgson
Benjamin Hodgson

Reputation: 44634

As I mentioned in my other answer, streaming results out of a list of Asyncs as they become available is best achieved using a stream processing library. Here's an example using pipes.

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Functor (($>))
import Pipes
import Pipes.Concurrent  -- from the pipes-concurrency package
import qualified Pipes.Prelude as P


asCompleted :: MonadIO m => [Async a] -> Producer a m ()
asCompleted asyncs = do
    (o, i, seal) <- liftIO $ spawn' unbounded
    liftIO $ forkIO $ do
        forConcurrently asyncs (\async -> atomically $ waitSTM async >>= send o)
        atomically seal
    fromInput i

main = do
    actions <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"]
    runEffect $ asCompleted actions >-> P.print
-- after one second, prints "foo", then "bar" a second later

Using pipes-concurrency, we spawn' an Output-Input pair and immediately convert the Input to a Producer using fromInput. Asynchronously, we send items as they become available. When all the Asyncs have completed we seal the inbox to close down the Producer.

Upvotes: 1

Benjamin Hodgson
Benjamin Hodgson

Reputation: 44634

I'm reading your question as "is it possible to sort a list of Asyncs by their completion time?". If that's what you meant, the answer is yes.

import Control.Applicative (liftA2)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Data.Functor (($>))
import Data.List (sortBy)
import Data.Ord (comparing)
import Data.Time (getCurrentTime)


sortByCompletion :: [Async a] -> IO [a]
sortByCompletion = fmap (fmap fst . sortBy (comparing snd)) . mapConcurrently withCompletionTime
    where withCompletionTime async = liftA2 (,) (wait async) getCurrentTime

main = do
    asyncs <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"]
    sortByCompletion asyncs
-- ["foo", "bar"], after two seconds

Using mapConcurrently we wait for each Async on a separate thread. Upon completion we get the current time - the time at which the Async completed - and use it to sort the results. This is O(n log n) complexity because we are sorting the list. (Your original algorithm was effectively a selection sort.)

Like your collect, sortByCompletion doesn't return until all the Asyncs in the list have completed. If you wanted to stream results onto the main thread as they become available, well, lists aren't a very good tool for that. I'd use a streaming abstraction like conduit or pipes, or, working at a lower level, a TQueue. See my other answer for an example.

Upvotes: 0

Related Questions