Reputation: 1822
Note: If you haven't stumbled upon asynchronous circuits yet, reading my two other Stack Overflow posts (besides John Hughes' article where he wrote them down) on them could come in quite handy: "An ArrowCircuit instance for stream processors which could block", "Hughes' Fibonacci stream".
John Hughes has come up with the following type for asynchronous circuits in his well-known "Generalising Monads to Arrows":
data StreamProcessor a b = Get (a -> StreamProcessor a b) |
Put b (StreamProcessor a b) |
Halt
instance Category StreamProcessor where
id = Get (\ x -> Put x id)
Put c bc . ab = Put c (bc . ab) {- 1 -}
Get bbc . Put b ab = (bbc b) . ab {- 2 -}
Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a) {- 3 -}
Get bbc . Halt = Halt {- 4 -}
Halt . ab = Halt {- 5 -}
bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f) = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f) = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt = Halt
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
first = bypass [] []
instance ArrowChoice StreamProcessor where
left (Put b ab) = Put (Left b) (left ab)
left (Get aab) = Get $ \ a -> case a of
Left a' -> (left . aab) a'
Right d -> Put (Right d) (left $ Get aab)
left Halt = Halt
In his other paper ("Programming with arrows") he wrote down a combinator which works for arrows (well, ArrowChoice
s) like mapM
for Monads
:
biteOff :: [a] -> Either [a] (a, [a])
biteOff [] = Left []
biteOff (x : xs) = Right (x, xs)
mapA :: ArrowChoice k => k a b -> k [a] [b]
mapA k = let
go :: ArrowChoice k => k a b -> k (a, [a]) [b]
go k = k *** mapA k >>^ uncurry (:)
in arr biteOff >>> (arr (const []) ||| go k)
Long story short, it does not work on StreamProcessor
s:
GHCi> Put x sp = Put [1, 2, 3] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
GHCi> Put x sp = Put [] Halt >>> mapA id
GHCi> x
*** Exception: stack overflow
Interestingly enough, it is not just the value that can't decide, but also the constructor itself:
GHCi> Get f = Put [1, 2, 3] Halt >>> mapA id
GHCi> Put x sp = f ()
GHCi> x
*** Exception: stack overflow
So, here is how I understand whatever's happening:
StreamProcessor
has pretty cumbersome rules for composition. To compose two arrows, we often have to know both constructors. Therefore, when we stumble into an infinite series with composition, we just have to hope that the {- 1 -}
st and the {- 5 -}
th rule for (.)
only would be working. We aren't quite lucky:
mapA
= arr biteOff >>> (arr (const []) ||| go k)
.
arr biteOff >>> (arr (const []) ||| go k)
= (arr (const []) ||| go k) . arr biteOff
.
arr (const []) ||| go k
= left (arr (const [])) >>> arr mirror >>> left (go k) >>> arr mirror >>> arr untag
. See (|||)
, mirror
and untag
here.
>>>
is infixr
. Hence: left (arr (const [])) >>> (arr mirror >>> (left (go k) >>> (arr mirror >>> arr untag)))
.
arr mirror >>> arr untag
is a Get
, since both of them are Get
s (rule {- 3 -}
for (.)
).
left (go k) >>> (arr mirror >>> arr untag)
= (arr mirror >>> arr untag) . left (go k)
= Get ... . left (go k)
. Rules from {- 2 -}
through {- 4 -}
work here. As one can see, we need to pattern-match left (go k)
now. A glance at the left
tells us that we need to pattern-match go k
.
go k
= k *** mapA k >>^ uncurry (:)
.
k *** mapA k >>^ uncurry (:)
= first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:))
.
first k >>> arr swap >>> first (mapA k) >>> arr swap >>> arr (uncurry (:))
= ... . (first (mapA k) . ...)
. first (mapA k)
turned out to be the first argument of some right-to-left composition (since .
is infixr
), therefore it needs to be pattern-matched. A quick glance at first
shows that we need to pattern-match mapA k
.
Start over.
So, since this didn't quite work out, I thought about writing down some mapSP
for StreamProcessor
s only. Yet, it doesn't seem to be a great idea now:
Say, we applied mapSP
to some sp
. A list [a1, a2]
is input. sp
blocks on a1
and goes on with a2
. I see no way to deal with this.
So, is my understanding as to why generic mapA
does not work for StreamProcessor
s right? Is there some mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
which would add up, not just type check?
Upvotes: 3
Views: 75
Reputation: 3924
I don't think you're going to get a mapA
as general as you want, but I'm also not sure your reasoning for why not is correct. As I see it, the reason mapA
can't work is because StreamProcessor
is strict in its shape, and mapA
as it's defined requires it to be lazy in its shape. On the other hand, your argument about one part blocking while another continues is actually fine.
To see why this "length-2 list" example is really not a problem, we need look only as far as bypass
. After all, mapA f
specialized to a length-2 list is no different than:
mapA2 :: Arrow k => k a b -> k [a] [b] -- These lists must be length 2!
mapA2 f = arr (\[a,b] -> (a,b)) >>> (f *** f) >>> arr (\(a,b) -> [a,b])
Now, let's construct a couple StreamProcessor
s, one that's a Get
and one that's a Put
:
putOnes :: StreamProcessor a Int
putOnes = Put 1 putOnes
sumTwo :: StreamProcessor Int Int
sumTwo = Get $ \x -> Get $ \y -> Put (x+y) sumTwo
And it's no problem to combine them with ***
:
> runStreamProcessor (putOnes *** sumTwo) (zip (repeat 1) [1..10])
[(1,3),(1,7),(1,11),(1,15),(1,19)]
If this is okay, then clearly mapA2
works, which means that mapA
is not in trouble just because Get
s and Put
s don't always match up. The mapA
just needs to do some synchronizing.
So how should mapA
work? Let's consider what it would mean to write mapA putOnes
. One might think that it should simply be a Put
, especially considering that putOnes
contains only Put
s. But, if it doesn't have a Get
, than what is contained in that Put
? It seems that regardless of the input, the output to runStreamProcessor (mapA putOnes) anyOldList
will be an infinite list of infinite lists of ones! This is definitely not the desired behavior of mapA
, but that's because mapA
is only intended to be run with synchronous signal functions. In the synchronous case, there is always one input to one output, which means we morally always start with a Get
, and any signal functions beyond the length of the input list can safely be ignored.
This leaves us with two options for a "sensible" mapSP
.
***
and writing the list as nested pairs (much like I did with mapA2
above).mapSP
always start with a Get
and using the length of the input list to determine what to do. From there, we proceed by caching inputs, much like how bypass
works:mapSP :: StreamProcessor a b -> StreamProcessor [a] [b]
mapSP sp = go (repeat (sp, []))
where
-- We keep track of each SP along with its own personal queue of inputs.
-- We always start with a Get, to determine how many outputs to produce.
go sps = Get $ \as -> stepPuts (zipWith addA as sps) (drop (length as) sps)
addA a (s, lst) = (s, lst++[a])
stepPuts sps rest = case stepPuts' sps of
Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
Nothing -> go (sps ++ rest)
stepPuts' [] = Just []
stepPuts' ((Get f, a:as):rest) = stepPuts' ((f a, as) : rest)
stepPuts' ((Put x s, as):rest) = ((x, (s, as)):) <$> stepPuts' rest
stepPuts' _ = Nothing
Note that there are a lot of open choices in how this was written, most of which come down to how much asynchrony we want to keep around. We can explore this by looking at a few examples. Let's start by lifting one from Hughes' paper:
> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,0]]
Hmm, in Hughes' paper, mapA (delay 0)
acted as a "column delayer", producing a list like [[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
, but this mapSP
did not. Why? It's because of the first case in stepPuts
, which tells us to recur and call stepPuts
again. The idea here is that if we have a StreamProcessor
that produces multiple outputs per input, perhaps we don't want the mapSP
to delay those outputs. For instance:
putTwice :: StreamProcessor a a
putTwice = Get $ \x -> Put x $ Put x putTwice
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2,3],[4,5],[4,5],[6],[6],[7,8],[7,8],[9,10,11],[9,10,11],[12,13,14,15],[12,13,14,15]]
If we wanted a more synchronous semantics, we could change the definition of stepPuts
to:
stepPuts sps rest = case stepPuts' sps of
Just (unzip -> (xs, sps')) -> Put xs $ go (sps' ++ rest)
-- Just (unzip -> (xs, sps')) -> Put xs $ stepPuts sps' rest
Nothing -> go (sps ++ rest)
Now, we recover something like Hughes' more synchronous mapA
semantics:
> runStreamProcessor (mapSP (delay 0)) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[0,0,0],[1,2],[4],[6,5],[7,8,3],[9,10,11,0]]
> runStreamProcessor (mapSP putTwice) [[1,2,3],[4,5],[6],[7,8],[9,10,11],[12,13,14,15]]
[[1,2,3],[1,2],[4],[4,5],[6,5,3],[6,8,11,15]]
Upvotes: 1