dsign
dsign

Reputation: 12700

Haskell Conduit: is it possible to optionally have the result of a source?

I have the following types built from Data.Conduit:

type Footers = [(ByteString, ByteString)]

type DataAndConclusion = ConduitM () ByteString IO Footers

The idea of the second type being "produce a lot of ByteStrings, and if you can produce all of them, return a Footers". The condition is because conduits are governed by downstream functions, so the consumer of DataAndConclusion may have no need to consume all its items, and in that case the return wouldn't be reached. Which is precisely the behavior that I need. But when the end of the source is reached, I would like to have the produced Footers. This would be useful for example if the DataAndConclusions were incrementally computing an MD5 and such MD5 was only needed if the entire message was processed by the downstream (for example, downstream could be simply sending it through the network, but it doesn't make sense to finish computing and send the MD5 if the socket was closed before the last piece was sent by downstream).

So, basically I want to have something with this signature to consume a DataAndConclusions:

 type MySink = Sink ByteString IO ()

 mySink :: MySink 
 mySink = ... 

 difficultFunction :: ConduitM () a2 m r1 -> ConduitM a2 Void m r2 -> m (Maybe r1)

Question is, is there any way to implement "difficultFunction"? How?

Upvotes: 4

Views: 414

Answers (2)

danidiaz
danidiaz

Reputation: 27771

This would be useful for example if the DataAndConclusions were incrementally computing an MD5 and such MD5 was only needed if the entire message was processed by the downstream

Instead of relying on the return value of the upstream conduit, in this case perhaps you could accumulate the ongoing MD5 computation in a StateT layer beneath ConduitM, and access it after running the conduit.

The other part of the puzzle is detecting that the producer has finished first. Sinks can detect upstream end-of-input in await calls. You could write a Sink that notifies you of upstream termination in its own result type, perhaps with a Maybe.

But what if you are given a Sink that doesn't already do that? We would need a function like Sink i m r -> Sink i m (Maybe r). "Given a Sink that may halt early, return a new Sink that returns Nothing if upstream finishes first". But I don't know how to write that function.

Edit: This conduit sets an IORef to True when it detects upstream termination:

detectUpstreamClose :: IORef Bool ->  Conduit i IO i
detectUpstreamClose ref = conduit
  where
    conduit = do
        m <- await
        case m of
          Nothing -> liftIO (writeIORef ref True)
          Just i -> do
              yield i
              conduit

detectUpstreamClose could be inserted in a pipeline, and the IORef could be checked afterwards.

Upvotes: 2

Petr
Petr

Reputation: 63359

There should be definitely a nice solution, but I wasn't able to construct it using ConduitM primitives. Something with signature

ConduitM i a m r1 -> ConduitM a o m r2 -> ConduitM i o m (Maybe r1, r2)

Looks like a primitive function with this signature would be a good addition for the conduit library.

Nevertheless, @danidiaz's suggestion about StateT lead me to the following generic solution that lifts the whole computation to WriterT internally in order to remember the output of the first conduit, if it's reached:

import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Writer
import Data.Conduit
import Data.Monoid
import Data.Void

difficultFunction :: (Monad m)
                  => ConduitM () a2 m r1 -> ConduitM a2 Void m r2
                  -> m (r2, Maybe r1)
difficultFunction l r = liftM (fmap getLast) $ runWriterT (l' $$ r')
  where
    l' = transPipe lift l >>= lift . tell . Last . Just
    r' = transPipe lift r

(untested!)

Upvotes: 3

Related Questions