fcennecf
fcennecf

Reputation: 213

Parallel Haskell. Rate-Limiting the Producer

In Parallel and Concurrent Programming in Haskell, Simon Marlow provides a Stream a based on the following data, together with some producer and consumer:

data IList a
  = Nil
  | Cons a (IVar (IList a))

type Stream a = IVar (IList a)

streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
      var <- new
      fork $ loop xs var
      return var
    where
      loop [] var = put var Nil
      loop (x:xs) var = do
        tail <- new
        put var (Cons x tail)
        loop xs tail

Later, he mentions the drawbacks of this approach and proposes a solution:

In our previous example, the consumer was faster than the producer. If, instead, the producer had been faster than the consumer, then there would be nothing to stop the producer from getting a long way ahead of the consumer and building up a long IList chain in memory. This is undesirable, because large heap data structures incur overhead due to garbage collection, so we might want to rate-limit the producer to avoid it getting too far ahead. There’s a trick that adds some automatic rate-limiting to the stream API. It entails adding another constructor to the IList type:

data IList a
    = Nil
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

However, he doesn't finish this approach:

I’ll leave the rest of the implementation of this idea as an exercise for you to try on your own. See if you can modify streamFromList, streamFold, and streamMap to incorporate the Fork constructor. The chunk size and fork distance should be parameters to the producers (streamFromList and streamMap).

The same question has been asked on the mailing list, but nobody gave an answer.

So how could one limit the rate of the producer?

Upvotes: 4

Views: 430

Answers (3)

Camale H
Camale H

Reputation: 1

In this implementation, the fork is placed in the middle of the produced list.

import Control.DeepSeq
import Control.Monad.Par

data IList a
    = Nil -- need to be NFData
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

instance NFData a => NFData (IList a) where
    rnf Nil = ()
    rnf (Cons x xs) = rnf x `seq` rnf xs
    rnf (Fork c l) = rnf l

type Stream a = IVar (IList a)

-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamFold (+) 0)
-- 55

streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList chunkSize xs = do
    dt <- new
    dl <- new
    put dl xs
    fork $ next chunkSize dt dl
    return dt
  where
    next :: NFData a => Int -> Stream a -> IVar [a] -> Par ()
    next 1 dt dl = do 
        ilist <- get dl 
        case ilist of 
            [] -> put dt Nil 
            (x:xs) -> do 
                delaytail <- new 
                delaylist <- new 
                put delaylist xs
                put dt (Fork (next 1 delaytail delaylist) (Cons x delaytail))
    next chunkSize dt dl = do
        ilist <- get dl
        case ilist of
            [] -> put dt Nil
            (x : xs) -> do
                delaytail <- new
                delaylist <- new
                tail <- new
                put
                    dt
                    ( Fork
                        (next chunkSize delaytail delaylist)
                        (Cons x tail)
                    )
                loop xs tail delaytail delaylist (chunkSize - 2)
    loop :: NFData a => [a] -> Stream a -> Stream a -> IVar [a] -> Int -> Par ()
    loop [] var _ dl _ = do
        put var Nil
        put dl []
    loop (x : xs) var dt dl count =
        if count /= 0
            then do
                tail <- new
                put var (Cons x tail)
                loop xs tail dt dl (count - 1)
            else do
                put var (Cons x dt)
                put dl xs

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = do
    ilist <- get instrm
    case ilist of
        Nil -> return acc
        Cons h t -> streamFold fn (fn acc h) t
        Fork p Nil -> return acc
        Fork p (Cons h t) -> do
            fork p
            streamFold fn (fn acc h) t

-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamMap (*2)) >>= (streamFold (+) 0)
-- 110

streamMap :: (NFData a, NFData b) => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
    outstrm <- new
    fork $ init fn instrm outstrm
    return outstrm
  where
    init :: (NFData a, NFData b) => (a -> b) -> Stream a -> Stream b -> Par ()
    init fn instrm outstrm = do
        ilst <- get instrm
        case ilst of
            Nil -> put outstrm Nil
            Cons h t -> do
                newtl <- new
                put outstrm (Cons (fn h) newtl)
                init fn t newtl
            Fork p Nil -> put outstrm Nil
            Fork p (Cons h t) -> do
                fork p
                slist <- get t
                case slist of
                    Nil -> do
                        newtl <- new
                        put newtl Nil
                        put outstrm (Cons (fn h) newtl)
                    Cons h1 t1 -> do
                        newtl <- new
                        delaytail <- new
                        delaystrm <- new
                        put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) newtl))
                        loopCons fn h1 t1 newtl delaytail delaystrm
                    Fork p1 Nil -> do
                        delaytail <- new
                        put outstrm (Fork (put delaytail Nil) (Cons (fn h) delaytail))
                    Fork p1 (Cons h1 t1) -> do
                        delaytail <- new
                        delaystrm <- new
                        put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) delaytail))
    loopCons :: (NFData a, NFData b) => (a -> b) -> a -> Stream a -> Stream b -> Stream b -> Stream a -> Par ()
    loopCons fn h t var dl ds = do
        tlist <- get t
        case tlist of
            Nil -> do
                newtl <- new
                put newtl Nil
                put var (Cons (fn h) newtl)
                put ds Nil
            Cons h1 t1 -> do
                newtl <- new
                put var (Cons (fn h) newtl)
                loopCons fn h1 t1 newtl dl ds
            Fork p Nil -> do
                newtl <- new
                put newtl Nil
                put var (Cons (fn h) newtl)
            Fork p (Cons h1 t1) -> do
                put ds tlist
                put var (Cons (fn h) dl)

Upvotes: 0

ivan
ivan

Reputation: 6322

I think the following is a valid implementation.

{-# LANGUAGE BangPatterns #-}

import Control.Monad.Par (IVar, Par, fork, get, new, put, put_, runPar)
import Control.DeepSeq   (NFData, rnf)

data IList a
  = Nil
  | Cons a (IVar (IList a))
  | Fork (Par ()) (IVar (IList a))

instance NFData a => NFData (IList a) where
  rnf Nil = ()
  rnf (Cons a b) = rnf a `seq` rnf b
  rnf (Fork a b) = rnf (runPar a) `seq` rnf b

type Stream a = IVar (IList a)

main :: IO ()
main = print $ sum (pipeline [1 .. 10000])

pipeline :: [Int] -> [Int]
pipeline list = runPar $ do
  strm <- streamFromList list 100 200
  xs   <- streamFold (\x y -> (y : x)) [] strm
  return (reverse xs)

streamFromList :: NFData a => [a] -> Int -> Int -> Par (Stream a)
streamFromList xs k n = do
    var <- new
    fork $ loop xs var k
    return var
  where
    loop [] var _ = put var Nil
    loop xs var 0 = do
      var' <- new
      put_ var (Fork (loop xs var' n) var')
    loop (x:xs) var i = do
      tail <- new
      put var (Cons x tail)
      loop xs tail (i - 1)

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc strm = do
  ilst <- get strm
  case ilst of
    Nil      -> return acc
    Cons h t -> streamFold fn (fn acc h) t
    Fork p s -> fork p >> streamFold fn acc s

Here, streamFromList (the producer) values to the stream while streamFold consumes them in parallel. After the first k values, streamFromList puts a Fork in the stream. This Fork contains the computation to produce the next n values, and the stream from which those values can be consumed.

At this point, the consumer has a chance to catch up if it's fallen behind the producer. Upon reaching the Fork, it forks the contained producer. Again both producer and consumer can proceed in parallel until the producer, after another n values, adds another Fork to the stream and the cycle repeats.

Upvotes: 0

Zeta
Zeta

Reputation: 105995

The important part lies in the loop function:

  loop [] var = put var Nil
  loop (x:xs) var = do
    tail <- new
    put var (Cons x tail)
    loop xs tail

We need to add the fork distance f and the chunk size c as parameters:

  loop _ _ [] var = put var Nil
  loop 0 c (x:xs) var = -- see below
  loop f c (x:xs) var = do
    tail <- new
    put var (Cons x tail)
    loop (f-1) c xs tail

The fork distance gets reduced in every iteration. What do we need to do when the fork distance is zero? We provide a Fork op t, where op continues to produce the list:

  loop 0 c (x:xs) var = do
    tail <- new
    let op = loop c xs tail
    put var (Fork op (Cons x tail))

Note that we don't use Fork if the list is empty. That would be possible, but is a little bit silly, after all, there's nothing to be produced left. Changing streamFromList is now simple:

streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
  var <- new                            
  fork $ loop f c xs var                 
  return var 

Now, in order to use it, we need to change the case in streamFold:

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
  ilst <- get instrm
  case ilst of
    Cons h t          -> streamFold fn (fn acc h) t
    Fork p (Cons h t) -> -- see below
    _                 -> return acc

Remember, we didn't allow an empty list in the Fork in our streamFromList, but just in case we're matching it (and Nil) via wildcard.

What do we need to do if we encounter a Fork with data? First of all, we need to use fork to run the Par () operation in order to propagate t, and then we can start to use it. So our last case is

    Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t

streamMap is analogous. Only in this case you use additional parameters on your loop again like in streamFromList.

Upvotes: 7

Related Questions