Reputation: 684
I am currently in the process of learning pipes. While playing around with bidirectional pipes I noticed that unfold composition looks pretty similar:
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy x' x c' c m b') -> Proxy x' x c' c m r
-- instead of:
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
But we have to share the x' and x types because of the way we wire up the continuation:
(>>=) :: Monad m => Proxy a' a b' b m r -> (r -> Proxy a' a b' b m r') -> Proxy a' a b' b m r'
case p0 of
Request x' fx -> Request x' (\x -> go (fx x))
Respond b fb' -> fb b >>= \b' -> (fb' b')
...
But that's pretty easy to get around:
import Pipes
import Pipes.Core hiding ((//>))
main :: IO ()
main = runEffect $ lhs //> rhs
infixl 4 //>
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
p //> f = p >>~ go
where go x = go =<< request =<< f x
lhs :: Proxy x' x String String IO ()
lhs = each [1..10::Int] //> \i -> do
r <- respond $ "response nr. " ++ show i
lift . putStrLn $ "lhs: " ++ show r
rhs :: String -> Proxy String String x x' IO String
rhs x = do
lift . putStrLn $ "rhs 1: " ++ show x
y <- request "yield manually to upstream!"
lift . putStrLn $ "rhs 2: " ++ show y
return "return to upstream"
With the expected output:
rhs 1: "response nr. 1"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 2"
lhs: "return to upstream"
rhs 1: "response nr. 3"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 4"
lhs: "return to upstream"
rhs 1: "response nr. 5"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 6"
lhs: "return to upstream"
rhs 1: "response nr. 7"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 8"
lhs: "return to upstream"
rhs 1: "response nr. 9"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 10"
lhs: "return to upstream"
Best I can tell this doesn't break any laws either.
So finally here is my question: Why does Pipes use the current definition?
Upvotes: 3
Views: 103
Reputation: 34378
I believe the relevant part of (//>)
's contract is...
(p //> f)
replaces eachrespond
inp
withf
.
... which implies that f
will handle all values received from p
in the same manner. That, however, is exactly what your combinator circumvents -- in your example, you alternate between sets of messages as you go through the elements of each [1..10]
. To further illustrate the point, here is a slightly modified version of your code (in particular, I have picked a different name for your combinator, and used plain old (//>)
immediately after each [1..10]
, as your combinator behaves the same in that case):
infixl 4 //>*
(//>*) :: Monad m =>
Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
p //>* f = p >>~ go
where go x = f x >>= request >>= go
src :: Monad m => Producer Int m ()
src = each [1..10]
-- The types of lhs and rhs are more restrictive than yours, but for this
-- usage pattern (and with the adjustments I made) that is not a problem.
lhs :: Show a => a -> Server String String IO ()
lhs = \i -> do
r <- respond $ "response nr. " ++ show i
lift . putStrLn $ "lhs: " ++ r
rhs :: String -> Client String String IO String
rhs x = do
lift . putStrLn $ "rhs 0: Will this happen for every value?"
lift . putStrLn $ "rhs 1: " ++ x
y <- request "yield manually to upstream!"
lift . putStrLn $ "rhs 2: " ++ y
return "return to upstream"
The answer to the question I slipped in at the beginning of rhs
...
GHCi> runEffect $ (src //> lhs) //>* rhs
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 2: response nr. 2
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 3
lhs: yield manually to upstream!
rhs 2: response nr. 4
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 5
lhs: yield manually to upstream!
rhs 2: response nr. 6
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 7
lhs: yield manually to upstream!
rhs 2: response nr. 8
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 9
lhs: yield manually to upstream!
rhs 2: response nr. 10
lhs: return to upstream
... is no. Contrast that with what happens when I wire your functions using (//>)
as the outermost combinator, like this:
GHCi> runEffect $ src //> (\x -> lhs x //>* rhs)
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 2
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 3
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 4
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 5
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 6
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 7
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 8
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 9
lhs: yield manually to upstream!
rhs 0: Will this happen for every value?
rhs 1: response nr. 10
lhs: yield manually to upstream!
Instead of setting a server that will give at most ten responses (src //> lhs
), here every value gives rise to a single-response server, whose response is handled by the rhs
client. Given that there is no second response to be gotten out of the server, the code in rhs
after the request
is never ran. As a consequence, the values from src
are handled uniformly. To further emphasise that, note that using your combinator to do that is unnecessary: src //> (lhs >~> void . rhs)
does the same thing.
(Another thing to note is that, if we change back the types of lhs
and rhs
to what you had them to be at first, we can write the pipeline just above as src //>* (\x -> lhs x //>* rhs)
. However, that is not the same as (src //>* lhs) //>* rhs
. That is an associativity failure, and so your combinator does not give rise to a category.)
It also helps to clarify what is going on to replace your combinator with (>>~)
(something that I'm sure you have tried in your tests):
GHCi> runEffect $ (src //> lhs) >>~ void . rhs
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 2: response nr. 2
src //> lhs
offers up to ten responses; rhs
, however, only makes two requests, and so the other eight responses are left unused. To me, that suggests your combinator is best expressed as a way to make a client carry on requesting indefinitely:
-- requestForever :: Monad m => (b -> Client b' b m b') -> b -> Client b' b m r
requestForever :: Monad m =>
(b -> Proxy b' b c' c m b') -> b -> Proxy b' b c' c m r
requestForever f = go
where go x = f x >>= request >>= go
GHCi> runEffect $ (src //> lhs) >>~ requestForever rhs
rhs 0: Will this happen for every value?
rhs 1: response nr. 1
lhs: yield manually to upstream!
rhs 2: response nr. 2
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 3
lhs: yield manually to upstream!
rhs 2: response nr. 4
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 5
lhs: yield manually to upstream!
rhs 2: response nr. 6
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 7
lhs: yield manually to upstream!
rhs 2: response nr. 8
lhs: return to upstream
rhs 0: Will this happen for every value?
rhs 1: response nr. 9
lhs: yield manually to upstream!
rhs 2: response nr. 10
lhs: return to upstream
Upvotes: 1