Zhiltsoff Igor
Zhiltsoff Igor

Reputation: 1822

mapA for stream processors which could block (asynchronous circuits)

Note: If you haven't stumbled upon asynchronous circuits yet, reading my two other Stack Overflow posts (besides John Hughes' article where he wrote them down) on them could come in quite handy: "An ArrowCircuit instance for stream processors which could block", "Hughes' Fibonacci stream".


John Hughes has come up with the following type for asynchronous circuits in his well-known "Generalising Monads to Arrows":

data StreamProcessor a b = Get (a -> StreamProcessor a b) | 
                           Put b    (StreamProcessor a b) |
                           Halt

instance Category StreamProcessor where
    id = Get (\ x -> Put x id)
  
    Put c bc . ab = Put c (bc . ab)                          {- 1 -}
    Get bbc . Put b ab = (bbc b) . ab                        {- 2 -}
    Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)     {- 3 -}
    Get bbc . Halt = Halt                                    {- 4 -}
    Halt . ab = Halt                                         {- 5 -}

bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f)          = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f)    = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =      
  Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt             = Halt

instance Arrow StreamProcessor where
  arr f = Get $ \ a -> Put (f a) (arr f)
  first = bypass [] []

instance ArrowChoice StreamProcessor where
    left (Put b ab) = Put (Left b) (left ab)
    left (Get aab)  = Get $ \ a -> case a of 
                                       Left a' -> (left . aab) a'
                                       Right d -> Put (Right d) (left $ Get aab)
    left Halt = Halt

In his other paper ("Programming with arrows") he wrote down a combinator which works for arrows (well, ArrowChoices) like mapM for Monads:

biteOff :: [a] -> Either [a] (a, [a])
biteOff []       = Left []
biteOff (x : xs) = Right (x, xs)

mapA :: ArrowChoice k => k a b -> k [a] [b] 
mapA k = let
            go :: ArrowChoice k => k a b -> k (a, [a]) [b]
            go k = k *** mapA k >>^ uncurry (:)
         in arr biteOff >>> (arr (const []) ||| go k)

Long story short, it does not work on StreamProcessors:

GHCi> Put x sp = Put [1, 2, 3] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
GHCi> Put x sp = Put [] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow

Interestingly enough, it is not just the value that can't decide, but also the constructor itself:

GHCi> Get f = Put [1, 2, 3] Halt >>> mapA id
GHCi> Put x sp = f ()
GHCi> x
*** Exception: stack overflow

So, here is how I understand whatever's happening:

StreamProcessor has pretty cumbersome rules for composition. To compose two arrows, we often have to know both constructors. Therefore, when we stumble into an infinite series with composition, we just have to hope that the {- 1 -}st and the {- 5 -}th rule for (.) only would be working. We aren't quite lucky:

So, since this didn't quite work out, I thought about writing down some mapSP for StreamProcessors only. Yet, it doesn't seem to be a great idea now:

Say, we applied mapSP to some sp. A list [a1, a2] is input. sp blocks on a1 and goes on with a2. I see no way to deal with this.

So, is my understanding as to why generic mapA does not work for StreamProcessors right? Is there some mapSP :: StreamProcessor a b -> StreamProcessor [a] [b] which would add up, not just type check?

Upvotes: 3

Views: 75

Answers (1)

DDub
DDub

Reputation: 3924

I don't think you're going to get a mapA as general as you want, but I'm also not sure your reasoning for why not is correct. As I see it, the reason mapA can't work is because StreamProcessor is strict in its shape, and mapA as it's defined requires it to be lazy in its shape. On the other hand, your argument about one part blocking while another continues is actually fine.

To see why this "length-2 list" example is really not a problem, we need look only as far as bypass. After all, mapA f specialized to a length-2 list is no different than:

mapA2 :: Arrow k => k a b -> k [a] [b] -- These lists must be length 2!
mapA2 f = arr (\[a,b] -> (a,b)) >>> (f *** f) >>> arr (\(a,b) -> [a,b])

Now, let's construct a couple StreamProcessors, one that's a Get and one that's a Put:

putOnes :: StreamProcessor a Int
putOnes = Put 1 putOnes

sumTwo :: StreamProcessor Int Int
sumTwo = Get $ \x -> Get $ \y -> Put (x+y) sumTwo

And it's no problem to combine them with ***:

> runStreamProcessor (putOnes *** sumTwo) (zip (repeat 1) [1..10])
[(1,3),(1,7),(1,11),(1,15),(1,19)]

If this is okay, then clearly mapA2 works, which means that mapA is not in trouble just because Gets and Puts don't always match up. The mapA just needs to do some synchronizing.


So how should mapA work? Let's consider what it would mean to write mapA putOnes. One might think that it should simply be a Put, especially considering that putOnes contains only Puts. But, if it doesn't have a Get, than what is contained in that Put? It seems that regardless of the input, the output to runStreamProcessor (mapA putOnes) anyOldList will be an infinite list of infinite lists of ones! This is definitely not the desired behavior of mapA, but that's because mapA is only intended to be run with synchronous signal functions. In the synchronous case, there is always one input to one output, which means we morally always start with a Get, and any signal functions beyond the length of the input list can safely be ignored.

This leaves us with two options for a "sensible" mapSP.

  • First, we can pre-define the length of the list. If we know the lengths statically, then this is equivalent to using *** and writing the list as nested pairs (much like I did with mapA2 above).
  • Second, we can force a bit of synchronicity into the system by making mapSP always start with a Get and using the length of the input list to determine what to do. From there, we proceed by caching inputs, much like how bypass works:
mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
mapSP sp = go (repeat (sp, []))
  where
    -- We keep track of each SP along with its own personal queue of inputs.
    -- We always start with a Get, to determine how many outputs to produce.
    go sps = Get $ \as -> stepPuts (zipWith addA as sps) (drop (length as) sps)

    addA a (s, lst) = (s, lst++[a])

    stepPuts sps rest = case stepPuts' sps of
      Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
      Nothing -> go (sps ++ rest)

    stepPuts' [] = Just []
    stepPuts' ((Get f, a:as):rest) = stepPuts' ((f a, as) : rest)
    stepPuts' ((Put x s, as):rest) = ((x, (s, as)):) <$> stepPuts' rest
    stepPuts' _ = Nothing

Note that there are a lot of open choices in how this was written, most of which come down to how much asynchrony we want to keep around. We can explore this by looking at a few examples. Let's start by lifting one from Hughes' paper:

> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,0]]

Hmm, in Hughes' paper, mapA (delay 0) acted as a "column delayer", producing a list like [[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]], but this mapSP did not. Why? It's because of the first case in stepPuts, which tells us to recur and call stepPuts again. The idea here is that if we have a StreamProcessor that produces multiple outputs per input, perhaps we don't want the mapSP to delay those outputs. For instance:

putTwice :: StreamProcessor a a
putTwice = Get $ \x -> Put x $ Put x putTwice
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2,3],[4,5],[4,5],[6],[6],[7,8],[7,8],[9,10,11],[9,10,11],[12,13,14,15],[12,13,14,15]]

If we wanted a more synchronous semantics, we could change the definition of stepPuts to:

    stepPuts sps rest = case stepPuts' sps of
      Just (unzip -> (xs, sps')) -> Put xs $ go (sps' ++ rest)
      -- Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
      Nothing -> go (sps ++ rest)

Now, we recover something like Hughes' more synchronous mapA semantics:

> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2],[4],[4,5],[6,5,3],[6,8,11,15]]

Upvotes: 1

Related Questions