Reputation: 623
I want to implement a pipeline between two threads. I have thread A that take the data, process it, and send it to thread B. I have a MVar that check if the data is completely processed
However, I'm having an exception *** Exception: thread blocked indefinitely in an STM transaction
Why are my threads blocked? I though than when the first thread write on the channel, then when there is a data on the channel, the second one can read it
fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
lastPipe f chIn mIn = iter
where iter = do
atomically $ fmap f $ readTChan chIn
isDone <- pipelineDone chIn mIn
unless isDone $ iter
pipeline = do
chIn <- atomically newTChan
m <- newEmptyMVar
first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
last <- async $ lastPipe print chIn m
wait first
wait last
Upvotes: 2
Views: 113
Reputation: 4049
Your problem is in the logic of pipelineDone
. Currently, you have:
pipelineDone channel mIn = do
isDone <- fmap isJust $ tryTakeMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ isDone && isEmpty
tryTakeMVar
is going to take the contents of the MVar assuming there is something in there. Assuming your producer finishes first, it is going to write ()
into the MVar. Your consumer is then going to try and take the contents of it. If it succeeds, then the MVar goes empty. Any subsequent tryTakeMVar
will always return Nothing
, thus isDone && isEmpty
will always return false and you will keep trying to read from the TChan
. Once the TChan
goes empty, GHC can tell you that it has encountered a deadlock.
You should instead change your pipelineDone implementation to:
pipelineDone channel mIn = do
stillRunning <- isEmptyMVar mIn
isEmpty <- atomically $ isEmptyTChan channel
return $ (not stillRunning) && isEmpty
This will instead simply poll the MVar, instead of actually emptying it.
Upvotes: 1
Reputation: 62818
It seems odd to me to be using STM and semaphores in the same code block... Why not do the entire thing in STM?
In particular, why not a TChan (Maybe x)
, with Nothing
indicating the end of the sequence?
Also, notice that your fstPipe
likely just generates a bunch of unevaluated thunks and immediately chucks them into the TChan
, without actually computing anything. You probably want a seq
or similar in there to force some actual work to happen on that thread.
Upvotes: 3
Reputation: 116139
I think there's a race condition:
fstPipe
just before the putMVar
lastPipe
to read everything, and then call pipelineDone
pipelineDone
returns False
since putMVar
was not yet donelastPipe
will try to read from the channelputMVar
executes, but it's too lateNow lastPipe
is stuck reading on an empty channel.
Upvotes: 2