Reputation: 2671
I'm currently writing a small wrapper for NamedPipeServerStream
/NamedPipeClientStream
that is fully Event
based as oppose to using AsyncCallbacks
.
I expose sync and async methods for pretty much everything possible (connecting/waiting for connection, writing, etc) so if a consumer wanted to, for example, start a server instance and send a message when a client connects he could either go full sync route and do something like ...
var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");
or the async way like...
var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately
However I'm struggling with finding a good way to do the same for reading messages. Currently when a message is read I fire a MessageAvailable
event and pass the message in as one of the arguments.
I just can't come up with a proper way of implementing synchronous reads.
What I've considered is the following:
Having a GetNextMessage()
sync method that gets the message. Internally, this could be handled in two different ways:
I could keep an IEnumerable<Message>
with all of the not yet consumed messages. So as soon as the other side sends a message, I'd read it from the stream and store it in memory so they can be later consumed by GetNextMessage()
. The advantage is that it frees up the stream pretty much as soon as the message is written, so it doesn't block the other side from sending other messages. The disadvantage is that I have absolutely no control over how many messages I'll be holding or the size of them. My IEnumerable<Message>
might end up having 10GB worth of non-consumed messages, and there's nothing I can do about it since I can't force the consumer to retrieve messages.
I could take the view that I only ever store one message in an internal buffer, and only ever start reading again once that one was consumed via GetNextMessage()
. If I do that though, the other side would be prevented from writing other messages until the previous one was consumed. To be more exact, the other side would be able to write until the stream is full. Which could be either multiple small complete messages or a single incomplete message. In the case of an incomplete single message I think this is a worse approach because in between part 1 of the message being sent and subsequent parts, the other end might end up disconnecting and the whole message will be lost.
To make things harder, in either of the approaches above there's always the chance that the consumer is using events for receiving messages (remember the event contains the message received) and therefore has no need for GetNextMessage()
. I'd either need to stop sending the message in the event altogether, or find a way of not pushing the event to the internal buffer if the message is consumed via the event. And while I can easily tell whether there is an event handler or not, there's no way of knowing if the message is actually being handled there (i.e., consider a class implementing this one and listens to that event, yet does nothing with it). The only real approach I can see here is to remove the message from the event, force consumers to always call GetNextMessage()
, but am open to other ideas.
There's also another problem with either of the approaches, which is the fact that I can't control the order in which the messages are sent if WriteAsync()
is used (or Write()
is used from different threads).
Can anyone think of a better way of tackling this problem?
Upvotes: 3
Views: 951
Reputation: 101483
I'd suggest the following approach. Create interface:
public interface ISubscription : IDisposable {
Message NextMessage(TimeSpan? timeout);
}
public class Message {
}
And then implement like that:
public class NamedPipeServer {
public void StartAndWait() {
}
public ISubscription StartAndSubscribe() {
// prevent race condition before Start and subscribing to MessageAvailable
var subscription = new Subscription(this);
StartAndWait();
return subscription;
}
public ISubscription Subscribe() {
// if user wants to subscribe and some point after start - why not
return new Subscription(this);
}
public event Action<Message> MessageAvailable;
private class Subscription : ISubscription {
// buffer
private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
new ConcurrentQueue<Message>());
private readonly NamedPipeServer _server;
public Subscription(NamedPipeServer server) {
// subscribe to event
_server = server;
_server.MessageAvailable += OnMessageAvailable;
}
public Message NextMessage(TimeSpan? timeout) {
// this is blocking call
if (timeout == null)
return _queue.Take();
else {
Message tmp;
if (_queue.TryTake(out tmp, timeout.Value))
return tmp;
return null;
}
}
private void OnMessageAvailable(Message msg) {
// add to buffer
_queue.Add(msg);
}
public void Dispose() {
// clean up
_server.MessageAvailable -= OnMessageAvailable;
_queue.CompleteAdding();
_queue.Dispose();
}
}
}
Client then either calls Subscribe
or StartAndSubscribe
.
var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();
That way if no one subscribes - you buffer no messages. And if someone subscribes and then does not consume - it's their problem, not yours, because buffering happens in subscription they now own. You can also limit size of _queue
blocking collection, then adding to it will block if limit is reached, blocking your MessageAvailable
event, but I won't recommend doing that.
Upvotes: 1