Reputation: 5037
How would you collect the results of a list of Async a
in Haskell as they become available? The idea is to start processing the results of asynchronous tasks as soon as they are available.
The best I could come up with is the following function:
collect :: [Async a] -> IO [a]
collect [] = return []
collect asyncs = do
(a, r) <- waitAny asyncs
rs <- collect (filter (/= a) asyncs)
return (r:rs)
However, this function does not exhibits the desired behavior since, as pointed out in the comment below, it doesn't return till all the asynchronous tasks are completed. Furthermore, collect
runs in O(n^2)
since I'm filtering the list at each recursive step. This could be improved by using a more efficient structure (and maybe indexing the position of the Async
values in the list).
Maybe there are library functions that take care of this, but I could not find them in the Control.Concurrent.Async
module and I wonder why.
EDIT: after thinking the problem a bit more carefully, I'm wondering whether such function is a good idea. I could just use fmap
on the asynchronous tasks. Maybe it is a better practice to wait for the results when there is no other choice.
Upvotes: 2
Views: 201
Reputation: 2565
Implemented via TChan
, additionally implemented a version which can react immediately, but it is more complex and also might have problems with exceptions (if you want to receive exceptions, use SlaveThread.fork
instead of forkIO
), so I commented that code in case you're not interested in it:
import Control.Concurrent (threadDelay)
import Control.Concurrent (forkIO)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
collect :: [Async a] -> IO [a]
collect = atomically . collectSTM
collectSTM :: [Async a] -> STM [a]
collectSTM as = do
c <- newTChan
collectSTMChan c as
collectSTMChan :: TChan a -> [Async a] -> STM [a]
collectSTMChan chan as = do
mapM_ (waitSTM >=> writeTChan chan) as
replicateM (length as) (readTChan chan)
main :: IO ()
main = do
a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2)
a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3)
a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1)
res <- collect [a1,a2,a3]
putStrLn (show res)
-- -- reacting immediately
-- a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2)
-- a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3)
-- a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1)
-- c <- collectChan [a1,a2,a3]
-- replicateM_ 3 (atomically (readTChan c) >>= \v -> putStrLn ("Received: " ++ show v))
-- collectChan :: [Async a] -> IO (TChan a)
-- collectChan as = do
-- c <- newTChanIO
-- forM_ as $ \a -> forkIO ((atomically . (waitSTM >=> writeTChan c)) a)
-- return c
Upvotes: 1
Reputation: 44634
As I mentioned in my other answer, streaming results out of a list of Async
s as they become available is best achieved using a stream processing library. Here's an example using pipes
.
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Functor (($>))
import Pipes
import Pipes.Concurrent -- from the pipes-concurrency package
import qualified Pipes.Prelude as P
asCompleted :: MonadIO m => [Async a] -> Producer a m ()
asCompleted asyncs = do
(o, i, seal) <- liftIO $ spawn' unbounded
liftIO $ forkIO $ do
forConcurrently asyncs (\async -> atomically $ waitSTM async >>= send o)
atomically seal
fromInput i
main = do
actions <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"]
runEffect $ asCompleted actions >-> P.print
-- after one second, prints "foo", then "bar" a second later
Using pipes-concurrency
, we spawn'
an Output
-Input
pair and immediately convert the Input
to a Producer
using fromInput
. Asynchronously, we send
items as they become available. When all the Async
s have completed we seal
the inbox to close down the Producer
.
Upvotes: 1
Reputation: 44634
I'm reading your question as "is it possible to sort a list of Async
s by their completion time?". If that's what you meant, the answer is yes.
import Control.Applicative (liftA2)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Data.Functor (($>))
import Data.List (sortBy)
import Data.Ord (comparing)
import Data.Time (getCurrentTime)
sortByCompletion :: [Async a] -> IO [a]
sortByCompletion = fmap (fmap fst . sortBy (comparing snd)) . mapConcurrently withCompletionTime
where withCompletionTime async = liftA2 (,) (wait async) getCurrentTime
main = do
asyncs <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"]
sortByCompletion asyncs
-- ["foo", "bar"], after two seconds
Using mapConcurrently
we wait for each Async
on a separate thread. Upon completion we get the current time - the time at which the Async
completed - and use it to sort the results. This is O(n log n) complexity because we are sorting the list. (Your original algorithm was effectively a selection sort.)
Like your collect
, sortByCompletion
doesn't return until all the Async
s in the list have completed. If you wanted to stream results onto the main thread as they become available, well, lists aren't a very good tool for that. I'd use a streaming abstraction like conduit
or pipes
, or, working at a lower level, a TQueue
. See my other answer for an example.
Upvotes: 0