Alexey Raga
Alexey Raga

Reputation: 7525

Merge conduits into one

I am looking for a function that can do something similar to:

merge :: MonadIO m => [Producer m a] -> Producer m a

I had a quick look at stm-conduit, it looks similar but I am not sure if it fits my requirements:

messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

As you can see, this conduit producer acks a message after yielding it. In a simple "single-threaded" pipeline it works well, the message makes its way to the sink and is then acked.

However with stm-conduit this may change because, as far as I can understand, the producer would not wait for the message to be consumed by the sink, they would work in parallel instead and the message could be acked prematurely.

Is my understanding of stm-conduit correct?
And what would be the way to merge separate sources into one to have a nice single-stream semantics?

UPDATE: Updated code to a real working AMQP example as requested (however it may be a bit noisier).

UPDATE 2: I think what I am after could be an Alternative instance for conduit sources so I could do something like let src = src1 <|> src2. Is it possible somehow?

Upvotes: 4

Views: 536

Answers (2)

zakyggaps
zakyggaps

Reputation: 3080

mergeSources in stm-conduit maintains a TBMChannel behind the scene. All your Sources / Producers are first connected to the TBMChannel, then it will create a single Source that try pulling values out from the channel FIFO.

You can set the bound of the intermediate TBMChannel when using mergeSources. Say you set the bound to n, then the first n values produced by all the Sources will be dumped to the TBMChannel and the AmqpConn immediately, assuming it's not blocked at the AmqpConn end, and your consumer is slower than the sources (BTW AmqpConn uses unbounded Control.Concurrent.Chan so it won't block). After that the TBMChannel is full so anymore Sources trying to yield a value to the channel is blocked. Your consumer takes value one by one out of the combined source so it's sequential after the first n elements.

To make sure it's sequential from the beginning you can set the bound to 1, however it may cause some performance issues.

Upvotes: 1

Will Sewell
Will Sewell

Reputation: 2643

Have a look at ZipSource, which is a newtype wrapper whose Applicative lets you combine Sources in the way that you want.

Once you have a ZipSource, you can use zipSources to combine the Sources in a Traversable (e.g. a list) into a Source of Traversables.

The only difference to your desired result type is that it is a Source over a Traversable of values, rather than just a single value, but that shouldn't be much of an issue.

Upvotes: 4

Related Questions