In Parallel and Concurrent Programming in Haskell, Simon Marlow provides a Stream a
based on the following data, together with some producer and consumer:
data IList a
= Nil
| Cons a (IVar (IList a))
type Stream a = IVar (IList a)
streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
var <- new
fork $ loop xs var
return var
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
Later, he mentions the drawbacks of this approach and proposes a solution:
In our previous example, the consumer was faster than the producer. If, instead, the producer had been faster than the consumer, then there would be nothing to stop the producer from getting a long way ahead of the consumer and building up a long IList chain in memory. This is undesirable, because large heap data structures incur overhead due to garbage collection, so we might want to rate-limit the producer to avoid it getting too far ahead. There’s a trick that adds some automatic rate-limiting to the stream API. It entails adding another constructor to the
type:data IList a = Nil | Cons a (IVar (IList a)) | Fork (Par ()) (IList a)
However, he doesn't finish this approach:
I’ll leave the rest of the implementation of this idea as an exercise for you to try on your own. See if you can modify
, andstreamMap
to incorporate theFork
constructor. The chunk size and fork distance should be parameters to the producers (streamFromList
The same question has been asked on the mailing list, but nobody gave an answer.
So how could one limit the rate of the producer?
In this implementation, the fork is placed in the middle of the produced list.
import Control.DeepSeq
import Control.Monad.Par
data IList a
= Nil -- need to be NFData
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons x xs) = rnf x `seq` rnf xs
rnf (Fork c l) = rnf l
type Stream a = IVar (IList a)
-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamFold (+) 0)
-- 55
streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList chunkSize xs = do
dt <- new
dl <- new
put dl xs
fork $ next chunkSize dt dl
return dt
next :: NFData a => Int -> Stream a -> IVar [a] -> Par ()
next 1 dt dl = do
ilist <- get dl
case ilist of
[] -> put dt Nil
(x:xs) -> do
delaytail <- new
delaylist <- new
put delaylist xs
put dt (Fork (next 1 delaytail delaylist) (Cons x delaytail))
next chunkSize dt dl = do
ilist <- get dl
case ilist of
[] -> put dt Nil
(x : xs) -> do
delaytail <- new
delaylist <- new
tail <- new
( Fork
(next chunkSize delaytail delaylist)
(Cons x tail)
loop xs tail delaytail delaylist (chunkSize - 2)
loop :: NFData a => [a] -> Stream a -> Stream a -> IVar [a] -> Int -> Par ()
loop [] var _ dl _ = do
put var Nil
put dl []
loop (x : xs) var dt dl count =
if count /= 0
then do
tail <- new
put var (Cons x tail)
loop xs tail dt dl (count - 1)
else do
put var (Cons x dt)
put dl xs
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = do
ilist <- get instrm
case ilist of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p Nil -> return acc
Fork p (Cons h t) -> do
fork p
streamFold fn (fn acc h) t
-- >>> runPar $ (streamFromList 3 [1 .. 10]) >>= (streamMap (*2)) >>= (streamFold (+) 0)
-- 110
streamMap :: (NFData a, NFData b) => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
outstrm <- new
fork $ init fn instrm outstrm
return outstrm
init :: (NFData a, NFData b) => (a -> b) -> Stream a -> Stream b -> Par ()
init fn instrm outstrm = do
ilst <- get instrm
case ilst of
Nil -> put outstrm Nil
Cons h t -> do
newtl <- new
put outstrm (Cons (fn h) newtl)
init fn t newtl
Fork p Nil -> put outstrm Nil
Fork p (Cons h t) -> do
fork p
slist <- get t
case slist of
Nil -> do
newtl <- new
put newtl Nil
put outstrm (Cons (fn h) newtl)
Cons h1 t1 -> do
newtl <- new
delaytail <- new
delaystrm <- new
put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) newtl))
loopCons fn h1 t1 newtl delaytail delaystrm
Fork p1 Nil -> do
delaytail <- new
put outstrm (Fork (put delaytail Nil) (Cons (fn h) delaytail))
Fork p1 (Cons h1 t1) -> do
delaytail <- new
delaystrm <- new
put outstrm (Fork (init fn delaystrm delaytail) (Cons (fn h) delaytail))
loopCons :: (NFData a, NFData b) => (a -> b) -> a -> Stream a -> Stream b -> Stream b -> Stream a -> Par ()
loopCons fn h t var dl ds = do
tlist <- get t
case tlist of
Nil -> do
newtl <- new
put newtl Nil
put var (Cons (fn h) newtl)
put ds Nil
Cons h1 t1 -> do
newtl <- new
put var (Cons (fn h) newtl)
loopCons fn h1 t1 newtl dl ds
Fork p Nil -> do
newtl <- new
put newtl Nil
put var (Cons (fn h) newtl)
Fork p (Cons h1 t1) -> do
put ds tlist
put var (Cons (fn h) dl)
I think the following is a valid implementation.
{-# LANGUAGE BangPatterns #-}
import Control.Monad.Par (IVar, Par, fork, get, new, put, put_, runPar)
import Control.DeepSeq (NFData, rnf)
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IVar (IList a))
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons a b) = rnf a `seq` rnf b
rnf (Fork a b) = rnf (runPar a) `seq` rnf b
type Stream a = IVar (IList a)
main :: IO ()
main = print $ sum (pipeline [1 .. 10000])
pipeline :: [Int] -> [Int]
pipeline list = runPar $ do
strm <- streamFromList list 100 200
xs <- streamFold (\x y -> (y : x)) [] strm
return (reverse xs)
streamFromList :: NFData a => [a] -> Int -> Int -> Par (Stream a)
streamFromList xs k n = do
var <- new
fork $ loop xs var k
return var
loop [] var _ = put var Nil
loop xs var 0 = do
var' <- new
put_ var (Fork (loop xs var' n) var')
loop (x:xs) var i = do
tail <- new
put var (Cons x tail)
loop xs tail (i - 1)
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc strm = do
ilst <- get strm
case ilst of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p s -> fork p >> streamFold fn acc s
Here, streamFromList
(the producer) values to the stream while streamFold
consumes them in parallel. After the first k
values, streamFromList
puts a Fork
in the stream. This Fork
contains the computation to produce the next n
values, and the stream from which those values can be consumed.
At this point, the consumer has a chance to catch up if it's fallen behind the producer. Upon reaching the Fork
, it fork
s the contained producer. Again both producer and consumer can proceed in parallel until the producer, after another n
values, adds another Fork
to the stream and the cycle repeats.
The important part lies in the loop
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
We need to add the fork distance f
and the chunk size c
as parameters:
loop _ _ [] var = put var Nil
loop 0 c (x:xs) var = -- see below
loop f c (x:xs) var = do
tail <- new
put var (Cons x tail)
loop (f-1) c xs tail
The fork distance gets reduced in every iteration. What do we need to do when the fork distance is zero? We provide a Fork op t
, where op
continues to produce the list:
loop 0 c (x:xs) var = do
tail <- new
let op = loop c xs tail
put var (Fork op (Cons x tail))
Note that we don't use Fork
if the list is empty. That would be possible, but is a little bit silly, after all, there's nothing to be produced left. Changing streamFromList
is now simple:
streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
var <- new
fork $ loop f c xs var
return var
Now, in order to use it, we need to change the case
in streamFold
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
ilst <- get instrm
case ilst of
Cons h t -> streamFold fn (fn acc h) t
Fork p (Cons h t) -> -- see below
_ -> return acc
Remember, we didn't allow an empty list in the Fork
in our streamFromList
, but just in case we're matching it (and Nil
) via wildcard.
What do we need to do if we encounter a Fork
with data? First of all, we need to use fork
to run the Par ()
operation in order to propagate t
, and then we can start to use it. So our last case is
Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t
is analogous. Only in this case you use additional parameters on your loop again like in streamFromList
