Reputation: 2826
I was hoping to see a nondeterministic interleaving operation for sources, with a type signature like
interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)
The use case is that I have a p2p application that maintains open connections to many nodes on the network, and it is mostly just sitting around waiting for messages from any of them. When a message arrives, it doesn't care where it came from, but needs to process the message as soon as possible. In theory this kind of application (at least when used for socket-like sources) could bypass GHC's IO manager entirely and run the select
/epoll
/etc. calls directly, but I don't particularly care how it's implemented, as long as it works.
Is something like this possible with conduit? A less general but probably more feasible approach might be to write a [(k, Socket)] -> Source m (k, ByteString)
function that handles receiving on all the sockets for you.
I noticed the ResumableSource
operations in conduit, but they all seem to want to be aware of a particular Sink
, which feels like a bit of an abstraction leak, at least for this operation.
Upvotes: 5
Views: 382
Reputation: 31305
The stm-conduit package provides the mergeSources which performs something similar- though not identical- to what you're looking for. It's probably a good place to start.
Upvotes: 5
Reputation: 35089
Yes, it is possible.
You can poll a bunch of Source
s without blocking by forking threads to poll where in each thread you pair the Source
up with a Sink
that sends the output to some concurrency channel:
concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r
... and then you define a Source
that reads from that channel:
synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r
Notice that this would be no different than just forking the threads to poll the sockets themselves, but it would be useful to other users of conduit
that might want to poll other things than sockets using Source
s they defined because it's more general.
If you combined those capabilities into one function, then the overall API of the call would look something like:
poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r)
... but you can still throw in those k
s if you want.
Upvotes: 3