
Reputation: 14578

Concurrent stack implementation using MVar

I am trying to implement a stack for use in a concurrent application. I would like the following semantics: push should never block, and pop should block the calling thread on an empty stack, but still permit pushes. I implemented it as follows (irrelevant bits at the bottom):

data Stream a = Stream a (MVar (Stream a))
data Stack a = Stack (MVar (Int, MVar (Stream a)))

popStack :: Stack a -> IO a 
popStack (Stack stack) = do 
  (sz, mvar) <- takeMVar stack
  mbStream <- tryTakeMVar mvar 
  case mbStream of 
    Nothing -> putMVar stack (sz, mvar) >> popStack (Stack stack)
    Just (Stream x xs) -> putMVar stack (sz-1, xs) >> return x

If the stream MVar is empty I have to release the lock on the stack and try again. However, this seems like a kludge: if a thread calls pop on an empty stack, it could loop several times before being suspended, even though the MVar will not become full while that thread is being executed. Is there a better way utilizing MVars to write pop with the desired semantics?

import Control.Concurrent.MVar 
import Control.Monad 
import Control.Concurrent
import Text.Printf

newStack :: IO (Stack a) 
newStack = do 
  stream <- newEmptyMVar 
  Stack <$> newMVar (0, stream)

pushStack :: Stack a -> a -> IO ()
pushStack (Stack stack) val = do 
  (sz, stream) <- takeMVar stack
  stream' <- newMVar (Stream val stream)
  putMVar stack (sz+1, stream')

test = do 
  s <- newStack
  _ <- forkIO $ mapM_ (\a -> printf "pushing %c... " a >> pushStack s a >> threadDelay 100000) ['a' .. 'z']
  _ <- forkIO $ do 
         replicateM 13 (popStack s) >>= printf "\npopped 13 elems: %s\n"
         replicateM 13 (popStack s) >>= printf "\npopped 13 elems: %s\n"
  threadDelay (5*10^6)
  putStrLn "Done"

Upvotes: 4

Views: 299

Answers (2)

Chris Kuklewicz
Chris Kuklewicz

Reputation: 8153

A quick critique:

  1. "push should never block" is not something you are going to actually achieve. Though you may have a perdonal definition of "block" that is different than the GHC meaning. For instance, your pushStack does block.
  2. popStack on an empty stack goes into a very busy loop, repeatedly taking and putting the Stack MVar. You do not want to do this, you even say "pop should block".
  3. You use takeMVar and putMVar instead of withMVar or modifyMVar. This means you are not thinking about exceptions, and the Stack will not be good in a general library.

So you have learned about MVars, and you are using them with them to learn more.

Here StackData is either a stack with data (Full) or without data (Empty). When Empty, there is an initally empty MVar for hungry poppers to wait upon.

type Lock = MVar ()
type Some a = (a, [a]) -- non empty version of list
data StackData a = Full !(Some a)
                 | Empty !Lock
data Stack a = Stack { stack :: MVar (StackData a) }

pop s = do
    x <- modifyMVar (stack s) $ \ sd ->
           case sd of
               Empty lock -> do
                   return (Empty lock, Left lock)
               Full (a, []) -> do
                   lock <- newEmptyMVar
                   return (Empty lock, Right a)
               Full (a, (b:bs)) -> return (Full (b, bs), Right a)
    case x of
        Left lock -> do
            withMVar lock return  -- wait on next pusher
            pop s
        Right a -> return a

 push s a = modifyMVar_ (stack s) $ \ sd ->
           case sd of
               Empty lock -> do
                   tryPutMVar lock () -- should succeed, releases waiting poppers
                   evaluate Full (a,[]) -- do not accumulate lazy thunks
               Full (b, bs) -> do
                   xs <- evaluate (b:bs) -- do not accumulate lazy thunks
                   evaluate (Full (a, xs)) -- do not accumulate lazy thunks

Note : I have not tried to compile this.

EDIT: A safer version of push needs to only put () into the lock when it has succeeded in modifying the stack from Empty to Full. This certainty can be achieved with the 'mask' operation. The 'restore' is used inside 'modifyMVar' but is not required:

push s a = mask $ \restore -> do
    mLock <- modifyMVar (stack s) $ \ sd -> restore $
           case sd of
               Empty lock -> do
                   n <- evaluate Full (a,[]) -- do not accumulate lazy thunks
                   return (n, Just lock)
               Full (b, bs) -> do
                   xs <- evaluate (b:bs) -- do not accumulate lazy thunks
                   n <- evaluate (Full (a, xs))
                   return (n, Nothing)
    whenJust mLock $ \ lock -> tryPutMVar lock ()

Upvotes: 1


Reputation: 16645

It's not very exciting, but the simplest solution would be to use STM (if you're using cabal you'll need the stm package in your dependencies list).

import Control.Concurrent.STM

newtype Stack a = Stack (TVar [a])

new :: STM (Stack a)
new = fmap Stack $ newTVar []

put :: a -> Stack a -> STM ()
put a (Stack v) = modifyTVar' v (a:)

get :: Stack a -> STM a
get (Stack v) = do
    stack <- readTVar v
    case stack of
         [] -> retry
         (a:as) -> do writeTVar v as
                      return a

You get the blocking behavior you want with retry, which is implemented in such a way that threads won't be awoken until the TVar changes to something other than []. This is also nice because you can now use your stack in larger composed atomic transactions, and you don't have to worry about making sure exceptions don't break your structure.

If you're trying to do high-performance concurrency with lots of threads contending for reads and/or writes, you might find that this isn't clever enough. In that case you might have fun designing a structure based around the fetch-and-add-based counter from atomic-primops, or seeing what else is available on hackage.

Upvotes: 2

Related Questions