Reputation: 14578
I am trying to implement a stack for use in a concurrent application. I would like the following semantics: push
should never block, and pop
should block the calling thread on an empty stack, but still permit push
es. I implemented it as follows (irrelevant bits at the bottom):
data Stream a = Stream a (MVar (Stream a))
data Stack a = Stack (MVar (Int, MVar (Stream a)))
popStack :: Stack a -> IO a
popStack (Stack stack) = do
(sz, mvar) <- takeMVar stack
mbStream <- tryTakeMVar mvar
case mbStream of
Nothing -> putMVar stack (sz, mvar) >> popStack (Stack stack)
Just (Stream x xs) -> putMVar stack (sz-1, xs) >> return x
If the stream MVar
is empty I have to release the lock on the stack and try again. However, this seems like a kludge: if a thread calls pop
on an empty stack, it could loop several times before being suspended, even though the MVar
will not become full while that thread is being executed. Is there a better way utilizing MVar
s to write pop
with the desired semantics?
import Control.Concurrent.MVar
import Control.Monad
import Control.Concurrent
import Text.Printf
newStack :: IO (Stack a)
newStack = do
stream <- newEmptyMVar
Stack <$> newMVar (0, stream)
pushStack :: Stack a -> a -> IO ()
pushStack (Stack stack) val = do
(sz, stream) <- takeMVar stack
stream' <- newMVar (Stream val stream)
putMVar stack (sz+1, stream')
test = do
s <- newStack
_ <- forkIO $ mapM_ (\a -> printf "pushing %c... " a >> pushStack s a >> threadDelay 100000) ['a' .. 'z']
_ <- forkIO $ do
replicateM 13 (popStack s) >>= printf "\npopped 13 elems: %s\n"
replicateM 13 (popStack s) >>= printf "\npopped 13 elems: %s\n"
threadDelay (5*10^6)
putStrLn "Done"
Upvotes: 4
Views: 299
Reputation: 8153
A quick critique:
So you have learned about MVars, and you are using them with them to learn more.
Here StackData is either a stack with data (Full) or without data (Empty). When Empty, there is an initally empty MVar for hungry poppers to wait upon.
type Lock = MVar ()
type Some a = (a, [a]) -- non empty version of list
data StackData a = Full !(Some a)
| Empty !Lock
data Stack a = Stack { stack :: MVar (StackData a) }
pop s = do
x <- modifyMVar (stack s) $ \ sd ->
case sd of
Empty lock -> do
return (Empty lock, Left lock)
Full (a, []) -> do
lock <- newEmptyMVar
return (Empty lock, Right a)
Full (a, (b:bs)) -> return (Full (b, bs), Right a)
case x of
Left lock -> do
withMVar lock return -- wait on next pusher
pop s
Right a -> return a
push s a = modifyMVar_ (stack s) $ \ sd ->
case sd of
Empty lock -> do
tryPutMVar lock () -- should succeed, releases waiting poppers
evaluate Full (a,[]) -- do not accumulate lazy thunks
Full (b, bs) -> do
xs <- evaluate (b:bs) -- do not accumulate lazy thunks
evaluate (Full (a, xs)) -- do not accumulate lazy thunks
Note : I have not tried to compile this.
EDIT: A safer version of push needs to only put () into the lock when it has succeeded in modifying the stack from Empty to Full. This certainty can be achieved with the 'mask' operation. The 'restore' is used inside 'modifyMVar' but is not required:
push s a = mask $ \restore -> do
mLock <- modifyMVar (stack s) $ \ sd -> restore $
case sd of
Empty lock -> do
n <- evaluate Full (a,[]) -- do not accumulate lazy thunks
return (n, Just lock)
Full (b, bs) -> do
xs <- evaluate (b:bs) -- do not accumulate lazy thunks
n <- evaluate (Full (a, xs))
return (n, Nothing)
whenJust mLock $ \ lock -> tryPutMVar lock ()
Upvotes: 1
Reputation: 16645
It's not very exciting, but the simplest solution would be to use STM
(if you're using cabal you'll need the stm
package in your dependencies list).
import Control.Concurrent.STM
newtype Stack a = Stack (TVar [a])
new :: STM (Stack a)
new = fmap Stack $ newTVar []
put :: a -> Stack a -> STM ()
put a (Stack v) = modifyTVar' v (a:)
get :: Stack a -> STM a
get (Stack v) = do
stack <- readTVar v
case stack of
[] -> retry
(a:as) -> do writeTVar v as
return a
You get the blocking behavior you want with retry
, which is implemented in such a way that threads won't be awoken until the TVar
changes to something other than []
. This is also nice because you can now use your stack in larger composed atomic transactions, and you don't have to worry about making sure exceptions don't break your structure.
If you're trying to do high-performance concurrency with lots of threads contending for reads and/or writes, you might find that this isn't clever enough. In that case you might have fun designing a structure based around the fetch-and-add
-based counter from atomic-primops, or seeing what else is available on hackage.
Upvotes: 2