Reputation: 45622
I'd like to use Control.Concurrent.Async mapConcurrently
to perform parallel downloads with http-conduit
. The solution here is not sufficient for my case because I'd like to process n tasks but throttle the number of concurrent workers to m, where m < n.
It's not enough either to pass to mapConcurrently
multiple chunks of m, because then the number of active workers will tend to be less than m since some of the tasks will complete earlier than others, leaving a utilization gap.
Is there an easy way -- nearly as easy as using mapConcurrently
I hope -- to implement a worker-pool concurrently performing a queue of tasks until all tasks are done?
Or is it easier just to keep the Haskell simple and do process-level parallelism with xargs -P
?
Upvotes: 3
Views: 1072
Reputation: 25665
Update: My answer is now obsolete, use Sibi's answer, as we put an implementation of this solution into unliftio
.
You can use monad-par
for it, which, like async
, is made by Simon Marlow.
Example:
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Par.Combinator (parMapM)
import Control.Monad.Par.IO (runParIO)
download :: Int -> IO String
download i = do
putStrLn ("downloading " ++ show i)
threadDelay 1000000 -- sleep 1 second
putStrLn ("downloading " ++ show i ++ " finished")
return "fake response"
main :: IO ()
main = do
-- "pooled" mapM
responses <- runParIO $ parMapM (\i -> liftIO $ download i) [1..10]
mapM_ putStrLn responses
Compile with ghc --make -threaded PooledMapM.hs
, run like ./PooledMapM +RTS -N2
.
You'll see the output:
downloading 10
downloading 9
downloading 9 finished
downloading 10 finished
downloading 8
downloading 7
downloading 8 finished
downloading 7 finished
downloading 6
downloading 5
downloading 6 finished
downloading 5 finished
downloading 4
downloading 3
downloading 4 finished
downloading 3 finished
downloading 2
downloading 1
downloading 2 finished
downloading 1 finished
fake response
fake response
fake response
fake response
fake response
fake response
fake response
fake response
fake response
fake response
Code also available via https://gist.github.com/nh2/bfa3f182da9d13baa536
Upvotes: 1
Reputation: 48654
The recent version of unliftio has various combinators for pooled concurrency.
Adapting Niklas's code from the other anwser, you would do it using unliftio
like this:
#!/usr/bin/env stack
-- stack --resolver lts-14.11 --install-ghc runghc --package unliftio --package say
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent (threadDelay)
import Say
import UnliftIO.Async
download :: Int -> IO String
download i = do
sayString ("downloading " ++ show i)
threadDelay 1000000 -- sleep 1 second
sayString ("downloading " ++ show i ++ " finished")
return "fake response"
main :: IO ()
main = do
responses <- pooledMapConcurrentlyN 5 download [1 .. 5]
print responses
In the above code, we don't spawn more than five threads. And it's output on execution:
$ stack pooled.hs
downloading 1
downloading 2
downloading 3
downloading 4
downloading 5
downloading 1 finished
downloading 5 finished
downloading 2 finished
downloading 3 finished
downloading 4 finished
["fake response","fake response","fake response","fake response","fake response"]
Upvotes: 3
Reputation: 27756
Perhaps the simplest solution is to throttle the IO
actions using a semaphore before wrapping them in Concurrently
, using a helper function like this one:
withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b)
withConc sem f = \a -> Concurrently
(bracket_ (waitQSem sem) (signalQSem sem) (f a))
We can use withConc
in combination with traverse to perform a throttled concurrent traversal of any Traversable
container of tasks:
traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b]
traverseThrottled concLevel action tasks = do
sem <- newQSem concLevel
runConcurrently (traverse (withConc sem action) tasks)
One disadvantage of this approach is that the use of Concurrently
will create as many threads as there are tasks, and only a subset of them will be doing actual work at any given time, thanks to the semaphore.
On the other hand, threads in Haskell are cheap so I think it is an acceptable solution in cases for which the number of tasks is not very big.
Edit: Giving traverseThrottled
a more general signature:
import Data.Traversable
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
traverseThrottled concLevel action taskContainer = do
sem <- newQSem concLevel
let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action
runConcurrently (traverse (Concurrently . throttledAction) taskContainer)
Upvotes: 5
Reputation: 63359
I'd suggest to use parallel
or parallelInterleaved
from parallel-io. It has (among others) these properties;
- Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallel. This should minimize contention and hence pre-emption, while also preventing starvation.
- On return all actions have been performed.
- The function returns in a timely manner as soon as all actions have been performed.
Upvotes: 2