Reputation: 17786
I want to do something along the lines of ArrowChoice, but with conduits. I want to await an Either value and then pass Left values to one conduit and Right values to another, and then merge the results back into an Either stream.
Presumably this can be done by making the inner conduits like automata: turn a conduit into a function that takes an argument and returns a monadic list of outputs yielded:
newtype AutomataM i m o = Automata (i -> m (o, Automata i o))
conduitStep :: Conduit i m o -> AutomataM i m [o]
The reason for the list of outputs is that a Conduit may yield 0 or more outputs for each input.
I've looked at ResumableConduit and its relatives, and presumably the answer is in there somewhere. But I can't quite see how its done.
Upvotes: 4
Views: 439
Reputation: 31305
It's not exactly the same type signature you provided, but:
import Data.Conduit
import Data.Conduit.Internal (Pipe (..), ConduitM (..))
newtype Automata i o m r = Automata (m ([o], Either r (i -> Automata i o m r)))
conduitStep :: Monad m => ConduitM i o m r -> Automata i o m r
conduitStep (ConduitM con0) =
Automata $ go [] id con0
where
go _ front (Done r) = return (front [], Left r)
go ls front (HaveOutput p _ o) = go ls (front . (o:)) p
go ls front (NeedInput p _) =
case ls of
[] -> return (front [], Right $ conduitStep . ConduitM . p)
l:ls' -> go ls' front (p l)
go ls front (PipeM mp) = mp >>= go ls front
go ls front (Leftover p l) = go (l:ls) front p
But just be careful with this approach:
There's probably a way to provide a ZipConduit
abstraction, similar to ZipSource
and ZipSink
, that would handle this kind of problem more elegantly, but I haven't thought about it too much.
EDIT I ended up implementing ZipConduit
in conduit-extra 0.1.5. Here's a demonstration of using it which sounds a bit like your case:
import Control.Applicative
import Data.Conduit
import Data.Conduit.Extra
import qualified Data.Conduit.List as CL
conduit1 :: Monad m => Conduit Int m String
conduit1 = CL.map $ \i -> "conduit1: " ++ show i
conduit2 :: Monad m => Conduit Double m String
conduit2 = CL.map $ \d -> "conduit2: " ++ show d
conduit :: Monad m => Conduit (Either Int Double) m String
conduit = getZipConduit $
ZipConduit (lefts =$= conduit1) *>
ZipConduit (rights =$= conduit2)
where
lefts = CL.mapMaybe (either Just (const Nothing))
rights = CL.mapMaybe (either (const Nothing) Just)
main :: IO ()
main = do
let src = do
yield $ Left 1
yield $ Right 2
yield $ Left 3
yield $ Right 4
sink = CL.mapM_ putStrLn
src $$ conduit =$ sink
Upvotes: 3
Reputation: 74334
There's a folk method of doing this using pipes
by using "push-category" Pipes. The complete implementation comes from both this mailing list post and this Stack Overflow answer. I think it hasn't been released yet due to both an effort to simplify the Pipes
interface, a focus on using the "sequencing" monad instance which is hidden via this method, and no proof yet that this implementation truly implements the Arrow class properly.
The idea is to implement a newtype Edge
(demonstrated below) which is a push-based pipe with the type arguments in the right order for Category
, Arrow
, ArrowChoice
and both Functor
and Applicative
over their output values. This lets you compose them into directed acyclic graphs using arrow notation. I'll run over the implementation below, but it's safe to just ignore it and use the Arrow
/ArrowChoice
/Applicative
instances of Edge
without too much concern.
(Edit: This code is best made available at https://github.com/Gabriel439/Haskell-RCPL-Library)
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeSynonymInstances #-}
import Prelude hiding ((.), id)
import Pipes.Core
import Pipes.Lift
import Control.Monad.Morph
import Control.Category
import Control.Monad.State.Strict
import Control.Arrow
This is an atypical mode of using pipes and isn't exposed in the Pipes
module; you must import Pipes.Core
to use push
. Push-based pipes look like
-- push :: a -> Proxy a' a a' a m r
and thus they demand at least one upstream value before the Proxy
is allowed to run. This means the whole process needs to be "kickstarted" by passing the first value as a function call and that the leftmost push-Proxy
will control the entire stream.
Given a push-based pipe we can implement Category
, Arrow
and ArrowChoice
. The standard solution also involves the Edge
typeclass so that we have the type arguments in the right order for Category
and Arrow
newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }
For the Category
instance, we use the "push" Category which has push
as id
and (<~<)
as composition:
instance Monad m => Category (Edge m r) where
id = Edge push
Edge a . Edge b = Edge (a <~< b)
We embed functions into Edge
with arr
by augmenting id
(i.e. push
) on the downward edge. To do this we use the respond
category which has the law p />/ respond == p
, but jam our f
into the process.
instance Monad m => Arrow (Edge m r) where
arr f = Edge (push />/ respond . f)
We also use a local state transformer to store the snd
half of our pairs and pass it "around" the input pipe in first
first (Edge p) = Edge $ \(b, d) ->
evalStateP d $ (up \>\ hoist lift . p />/ dn) b
where
up () = do
(b, d) <- request ()
lift (put d)
return b
dn c = do
d <- lift get
respond (c, d)
Finally, we get an ArrowChoice
instance by implementing left
. To do so we split the burden of passing the Left
and Right
sides using either the return or the pipe to pass values.
instance (Monad m) => ArrowChoice (Edge m r) where
left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
where
bef x = case x of
Left b -> return b
Right d -> do
_ <- respond (Right d)
x2 <- request ()
bef x2
up () = do
x <- request ()
bef x
dn c = respond (Left c)
We can use Edge
to create "push-based" producers and consumers
type PProducer m r b = Edge m r () b
type PConsumer m r a = forall b . Edge m r a b
and then we'll provide Functor
and Applicative
instances for PProducer
. This goes by case
analysis on the underlying Pipe
, so it's a bit verbose. Essentially, however, all that happens is that we insert f
into the yield
slot of the Pipe
.
instance Functor (PProducer m r) where
fmap f (Edge k) = $ Edge $ \() -> go (k ()) where
go p = case p of
Request () ku -> Request () (\() -> go (ku ()))
-- This is the only interesting line
Respond b ku -> Respond (f b) (\() -> go (ku ()))
M m -> M (m >>= \p' -> return (go p'))
Pure r -> Pure r
Finally, Applicative
is much the same except that we have to switch between running the upstream pipe to produce functions and running the downstream pipe to produce arguments.
instance (Monad m) => Applicative (Edge m r ()) where
pure b = Edge $ \() -> forever $ respond b
(Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
where
goL p1 p2 = case p1 of
Request () ku -> Request () (\() -> goL (ku ()) p2)
Respond f ku -> goR f (ku ()) p2
M m -> M (m >>= \p1' -> return (goL p1' p2))
Pure r -> Pure r
goR f p1 p2 = case p2 of
Request () ku -> Request () (\() -> goR f p1 (ku ()))
Respond x ku -> Respond (f x) (\() -> goL p1 (ku ()))
M m -> M (m >>= \p2' -> return (goR f p1 p2'))
Pure r -> Pure r
Upvotes: 1