Reputation: 550
getPage :: MVar String -> IO String
getPage m = do
url <- takeMVar m
putStrLn $ url ++ " is downloaded!"
return url
testDownload :: IO ()
testDownload = do
m <- newMVar "url1"
pages <- replicateConcurrently 10 (getPage m)
sequence_ [putMVar m ("url" ++ show i) | i <- [2..11]]
I'm just getting into concurrent programming in Haskell and still can't find my way around it. In the above code, the idea is to keep putting stuff into the MVar String
while the threads created by replicateConcurrently
will keep taking the MVars.
But I got a ***Exception: thread blocked indefinitely in an MVar operation
Upvotes: 1
Views: 246
Reputation: 3924
I see what you're trying to do, but the code doesn't actually do what you want. In particular, it's the line
pages <- replicateConcurrently 10 (getPage m)
that's causing you trouble. It's true that you're creating 10 threads to run 10 different calls to getPage m
, but then you're collecting the results, and because you're doing this "concurrently", you're waiting for all of the threads you created to finish before continuing.
The simplest way to achieve what you're doing is probably to use mapConcurrently
instead of replicateConcurrently
and not use the MVar
at all, as in:
getPage :: String -> IO String
getPage url = do
putStrLn $ url ++ " is downloaded!"
return url
testDownload :: IO ()
testDownload = do
pages <- mapConcurrently getPage ["url" ++ show i | i <- [2..11]]
putStrLn $ unlines pages
However, there is a way to achieve your goal and use the MVar
for communication. The trick is that you need something like replicateConcurrently
but that works asynchronously. Consider, for instance:
replicateAsync :: Int -> IO a -> (IO [a] -> IO b) -> IO b
replicateAsync n a f = go n []
where
go n waits
| n <= 0 = f $ sequenceA $ reverse waits
| otherwise = withAsync a $ \w -> go (n-1) (wait w:waits)
The type signature is a cross between withAsync
and replicateConcurrently
, and the behavior is too. The first argument is the number of threads to spawn, the second is the action to replicate, and the third is a continuation for what to do with the result.
You can use this to write testDownload
close to what you had:
testDownload :: IO ()
testDownload = do
m <- newMVar "url1"
replicateAsync 10 (getPage m) $ \awaitPages -> do
sequence_ [putMVar m ("url" ++ show i) | i <- [2..11]]
pages <- awaitPages
putStrLn $ unlines pages
Notice that unlike with replicateConcurrently
, the result you're waiting for is not produced right away when you call replicateAsync
. Instead, in your continuation, you're given a value of type IO [a]
(which I called awaitPages
here). When you perform this action, you'll actually do the synchronization and collection step, so as long as you do that after you sequence your putMVar
s, then all the threads will be properly active at once.
Upvotes: 4