daydaynatation
daydaynatation

Reputation: 550

Haskell Exception: thread blocked indefinitely in an MVar operation

getPage :: MVar String -> IO String
getPage m = do
  url <- takeMVar m
  putStrLn $ url ++ " is downloaded!"
  return url

testDownload :: IO ()
testDownload = do
  m <- newMVar "url1"
  pages <- replicateConcurrently 10 (getPage m)
  sequence_ [putMVar m ("url" ++ show i) | i <- [2..11]]

I'm just getting into concurrent programming in Haskell and still can't find my way around it. In the above code, the idea is to keep putting stuff into the MVar String while the threads created by replicateConcurrently will keep taking the MVars.

But I got a ***Exception: thread blocked indefinitely in an MVar operation

Upvotes: 1

Views: 246

Answers (1)

DDub
DDub

Reputation: 3924

I see what you're trying to do, but the code doesn't actually do what you want. In particular, it's the line

  pages <- replicateConcurrently 10 (getPage m)

that's causing you trouble. It's true that you're creating 10 threads to run 10 different calls to getPage m, but then you're collecting the results, and because you're doing this "concurrently", you're waiting for all of the threads you created to finish before continuing.

The simplest way to achieve what you're doing is probably to use mapConcurrently instead of replicateConcurrently and not use the MVar at all, as in:

getPage :: String -> IO String
getPage url = do
  putStrLn $ url ++ " is downloaded!"
  return url

testDownload :: IO ()
testDownload = do
  pages <- mapConcurrently getPage ["url" ++ show i | i <- [2..11]]
  putStrLn $ unlines pages

However, there is a way to achieve your goal and use the MVar for communication. The trick is that you need something like replicateConcurrently but that works asynchronously. Consider, for instance:

replicateAsync :: Int -> IO a -> (IO [a] -> IO b) -> IO b
replicateAsync n a f = go n []
  where
    go n waits
      | n <= 0 = f $ sequenceA $ reverse waits
      | otherwise = withAsync a $ \w -> go (n-1) (wait w:waits)

The type signature is a cross between withAsync and replicateConcurrently, and the behavior is too. The first argument is the number of threads to spawn, the second is the action to replicate, and the third is a continuation for what to do with the result.

You can use this to write testDownload close to what you had:

testDownload :: IO ()
testDownload = do
  m <- newMVar "url1"
  replicateAsync 10 (getPage m) $ \awaitPages -> do
    sequence_ [putMVar m ("url" ++ show i) | i <- [2..11]]
    pages <- awaitPages
    putStrLn $ unlines pages

Notice that unlike with replicateConcurrently, the result you're waiting for is not produced right away when you call replicateAsync. Instead, in your continuation, you're given a value of type IO [a] (which I called awaitPages here). When you perform this action, you'll actually do the synchronization and collection step, so as long as you do that after you sequence your putMVars, then all the threads will be properly active at once.

Upvotes: 4

Related Questions