Reputation: 7525
I am looking for a function that can do something similar to:
merge :: MonadIO m => [Producer m a] -> Producer m a
I had a quick look at stm-conduit
, it looks similar but I am not sure if it fits my requirements:
messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
where
loop = do
mmsg <- liftIO $ getMsg chan ack q
case mmsg of
Just (m, e) -> do
yield (m, e)
liftIO $ ackMsg chan (envDeliveryTag e) False
loop
Nothing -> loop
chan = fst $ amqpChan conn
As you can see, this conduit producer acks a message after yielding it. In a simple "single-threaded" pipeline it works well, the message makes its way to the sink and is then acked.
However with stm-conduit
this may change because, as far as I can understand, the producer would not wait for the message to be consumed by the sink, they would work in parallel instead and the message could be acked prematurely.
Is my understanding of stm-conduit
correct?
And what would be the way to merge separate sources into one to have a nice single-stream semantics?
UPDATE: Updated code to a real working AMQP example as requested (however it may be a bit noisier).
UPDATE 2: I think what I am after could be an Alternative instance for conduit sources so I could do something like let src = src1 <|> src2
. Is it possible somehow?
Upvotes: 4
Views: 536
Reputation: 3080
mergeSources
in stm-conduit
maintains a TBMChannel
behind the scene. All your Sources / Producers are first connected to the TBMChannel
, then it will create a single Source that try pulling values out from the channel FIFO.
You can set the bound of the intermediate TBMChannel
when using mergeSources
. Say you set the bound to n, then the first n values produced by all the Sources will be dumped to the TBMChannel
and the AmqpConn
immediately, assuming it's not blocked at the AmqpConn
end, and your consumer is slower than the sources (BTW AmqpConn
uses unbounded Control.Concurrent.Chan
so it won't block). After that the TBMChannel is full so anymore Sources trying to yield a value to the channel is blocked. Your consumer takes value one by one out of the combined source so it's sequential after the first n elements.
To make sure it's sequential from the beginning you can set the bound to 1, however it may cause some performance issues.
Upvotes: 1
Reputation: 2643
Have a look at ZipSource
, which is a newtype wrapper whose Applicative
lets you combine Source
s in the way that you want.
Once you have a ZipSource
, you can use zipSources
to combine the Source
s in a Traversable
(e.g. a list) into a Source
of Traversable
s.
The only difference to your desired result type is that it is a Source
over a Traversable
of values, rather than just a single value, but that shouldn't be much of an issue.
Upvotes: 4