dan
dan

Reputation: 45622

Running parallel URL downloads with a worker pool in Haskell

I'd like to use Control.Concurrent.Async mapConcurrently to perform parallel downloads with http-conduit. The solution here is not sufficient for my case because I'd like to process n tasks but throttle the number of concurrent workers to m, where m < n.

It's not enough either to pass to mapConcurrently multiple chunks of m, because then the number of active workers will tend to be less than m since some of the tasks will complete earlier than others, leaving a utilization gap.

Is there an easy way -- nearly as easy as using mapConcurrently I hope -- to implement a worker-pool concurrently performing a queue of tasks until all tasks are done?

Or is it easier just to keep the Haskell simple and do process-level parallelism with xargs -P?

Upvotes: 3

Views: 1072

Answers (4)

nh2
nh2

Reputation: 25665

Update: My answer is now obsolete, use Sibi's answer, as we put an implementation of this solution into unliftio.

You can use monad-par for it, which, like async, is made by Simon Marlow.

Example:

import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Par.Combinator (parMapM)
import Control.Monad.Par.IO (runParIO)

download :: Int -> IO String
download i = do
  putStrLn ("downloading " ++ show i)
  threadDelay 1000000 -- sleep 1 second
  putStrLn ("downloading " ++ show i  ++ " finished")
  return "fake response"


main :: IO ()
main = do
  -- "pooled" mapM
  responses <- runParIO $ parMapM (\i -> liftIO $ download i) [1..10]
  mapM_ putStrLn responses

Compile with ghc --make -threaded PooledMapM.hs, run like ./PooledMapM +RTS -N2.

You'll see the output:

downloading 10
downloading 9
downloading 9 finished
downloading 10 finished
downloading 8
downloading 7
downloading 8 finished
downloading 7 finished
downloading 6
downloading 5
downloading 6 finished
downloading 5 finished
downloading 4
downloading 3
downloading 4 finished
downloading 3 finished
downloading 2
downloading 1
downloading 2 finished
downloading 1 finished
fake response
fake response
fake response
fake response
fake response
fake response
fake response
fake response
fake response
fake response

Code also available via https://gist.github.com/nh2/bfa3f182da9d13baa536

Upvotes: 1

Sibi
Sibi

Reputation: 48654

The recent version of unliftio has various combinators for pooled concurrency.

Adapting Niklas's code from the other anwser, you would do it using unliftio like this:

#!/usr/bin/env stack
-- stack --resolver lts-14.11 --install-ghc runghc --package unliftio --package say
{-# LANGUAGE OverloadedStrings #-}

import Control.Concurrent (threadDelay)
import Say
import UnliftIO.Async

download :: Int -> IO String
download i = do
  sayString ("downloading " ++ show i)
  threadDelay 1000000 -- sleep 1 second
  sayString ("downloading " ++ show i ++ " finished")
  return "fake response"

main :: IO ()
main = do
  responses <- pooledMapConcurrentlyN 5 download [1 .. 5]
  print responses

In the above code, we don't spawn more than five threads. And it's output on execution:

$ stack pooled.hs
downloading 1
downloading 2
downloading 3
downloading 4
downloading 5
downloading 1 finished
downloading 5 finished
downloading 2 finished
downloading 3 finished
downloading 4 finished
["fake response","fake response","fake response","fake response","fake response"]

Upvotes: 3

danidiaz
danidiaz

Reputation: 27756

Perhaps the simplest solution is to throttle the IO actions using a semaphore before wrapping them in Concurrently, using a helper function like this one:

withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b)
withConc sem f = \a -> Concurrently 
    (bracket_ (waitQSem sem) (signalQSem sem) (f a))

We can use withConc in combination with traverse to perform a throttled concurrent traversal of any Traversable container of tasks:

traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b]
traverseThrottled concLevel action tasks = do
    sem <- newQSem concLevel
    runConcurrently (traverse (withConc sem action) tasks)

One disadvantage of this approach is that the use of Concurrently will create as many threads as there are tasks, and only a subset of them will be doing actual work at any given time, thanks to the semaphore.

On the other hand, threads in Haskell are cheap so I think it is an acceptable solution in cases for which the number of tasks is not very big.

Edit: Giving traverseThrottled a more general signature:

import Data.Traversable 
import Control.Concurrent
import Control.Concurrent.Async 
import Control.Exception

traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b) 
traverseThrottled concLevel action taskContainer = do
    sem <- newQSem concLevel
    let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action
    runConcurrently (traverse (Concurrently . throttledAction) taskContainer)

Upvotes: 5

Petr
Petr

Reputation: 63359

I'd suggest to use parallel or parallelInterleaved from parallel-io. It has (among others) these properties;

  1. Never creates more or less unblocked threads than are specified to live in the pool. NB: this count includes the thread executing parallel. This should minimize contention and hence pre-emption, while also preventing starvation.
  2. On return all actions have been performed.
  3. The function returns in a timely manner as soon as all actions have been performed.

Upvotes: 2

Related Questions