Reputation: 12700
I have the following types built from Data.Conduit:
type Footers = [(ByteString, ByteString)]
type DataAndConclusion = ConduitM () ByteString IO Footers
The idea of the second type being "produce a lot of ByteStrings, and if you can produce all of them, return a Footers". The condition is because conduits are governed by downstream functions, so the consumer of DataAndConclusion may have no need to consume all its items, and in that case the return wouldn't be reached. Which is precisely the behavior that I need. But when the end of the source is reached, I would like to have the produced Footers. This would be useful for example if the DataAndConclusions were incrementally computing an MD5 and such MD5 was only needed if the entire message was processed by the downstream (for example, downstream could be simply sending it through the network, but it doesn't make sense to finish computing and send the MD5 if the socket was closed before the last piece was sent by downstream).
So, basically I want to have something with this signature to consume a DataAndConclusions:
type MySink = Sink ByteString IO ()
mySink :: MySink
mySink = ...
difficultFunction :: ConduitM () a2 m r1 -> ConduitM a2 Void m r2 -> m (Maybe r1)
Question is, is there any way to implement "difficultFunction"? How?
Upvotes: 4
Views: 414
Reputation: 27771
This would be useful for example if the DataAndConclusions were incrementally computing an MD5 and such MD5 was only needed if the entire message was processed by the downstream
Instead of relying on the return value of the upstream conduit, in this case perhaps you could accumulate the ongoing MD5 computation in a StateT
layer beneath ConduitM
, and access it after running the conduit.
The other part of the puzzle is detecting that the producer has finished first. Sink
s can detect upstream end-of-input in await
calls. You could write a Sink
that notifies you of upstream termination in its own result type, perhaps with a Maybe
.
But what if you are given a Sink
that doesn't already do that? We would need a function like Sink i m r -> Sink i m (Maybe r)
. "Given a Sink
that may halt early, return a new Sink
that returns Nothing
if upstream finishes first". But I don't know how to write that function.
Edit: This conduit sets an IORef
to True
when it detects upstream termination:
detectUpstreamClose :: IORef Bool -> Conduit i IO i
detectUpstreamClose ref = conduit
where
conduit = do
m <- await
case m of
Nothing -> liftIO (writeIORef ref True)
Just i -> do
yield i
conduit
detectUpstreamClose
could be inserted in a pipeline, and the IORef
could be checked afterwards.
Upvotes: 2
Reputation: 63359
There should be definitely a nice solution, but I wasn't able to construct it using ConduitM
primitives. Something with signature
ConduitM i a m r1 -> ConduitM a o m r2 -> ConduitM i o m (Maybe r1, r2)
Looks like a primitive function with this signature would be a good addition for the conduit library.
Nevertheless, @danidiaz's suggestion about StateT
lead me to the following generic solution that lifts the whole computation to WriterT
internally in order to remember the output of the first conduit, if it's reached:
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Writer
import Data.Conduit
import Data.Monoid
import Data.Void
difficultFunction :: (Monad m)
=> ConduitM () a2 m r1 -> ConduitM a2 Void m r2
-> m (r2, Maybe r1)
difficultFunction l r = liftM (fmap getLast) $ runWriterT (l' $$ r')
where
l' = transPipe lift l >>= lift . tell . Last . Just
r' = transPipe lift r
(untested!)
Upvotes: 3