cogumel0
cogumel0

Reputation: 2671

NamedPipeServerStream/NamedPipeClientStream wrapper

I'm currently writing a small wrapper for NamedPipeServerStream/NamedPipeClientStream that is fully Event based as oppose to using AsyncCallbacks.

I expose sync and async methods for pretty much everything possible (connecting/waiting for connection, writing, etc) so if a consumer wanted to, for example, start a server instance and send a message when a client connects he could either go full sync route and do something like ...

var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");

or the async way like...

var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately

However I'm struggling with finding a good way to do the same for reading messages. Currently when a message is read I fire a MessageAvailable event and pass the message in as one of the arguments.

I just can't come up with a proper way of implementing synchronous reads.

What I've considered is the following:

Having a GetNextMessage() sync method that gets the message. Internally, this could be handled in two different ways:

To make things harder, in either of the approaches above there's always the chance that the consumer is using events for receiving messages (remember the event contains the message received) and therefore has no need for GetNextMessage(). I'd either need to stop sending the message in the event altogether, or find a way of not pushing the event to the internal buffer if the message is consumed via the event. And while I can easily tell whether there is an event handler or not, there's no way of knowing if the message is actually being handled there (i.e., consider a class implementing this one and listens to that event, yet does nothing with it). The only real approach I can see here is to remove the message from the event, force consumers to always call GetNextMessage(), but am open to other ideas.

There's also another problem with either of the approaches, which is the fact that I can't control the order in which the messages are sent if WriteAsync() is used (or Write() is used from different threads).

Can anyone think of a better way of tackling this problem?

Upvotes: 3

Views: 951

Answers (1)

Evk
Evk

Reputation: 101483

I'd suggest the following approach. Create interface:

public interface ISubscription : IDisposable {
    Message NextMessage(TimeSpan? timeout);
}

public class Message {

}

And then implement like that:

public class NamedPipeServer {        
    public void StartAndWait() {

    }

    public ISubscription StartAndSubscribe() {
        // prevent race condition before Start and subscribing to MessageAvailable
        var subscription = new Subscription(this);
        StartAndWait();
        return subscription;
    }

    public ISubscription Subscribe() {
        // if user wants to subscribe and some point after start - why not
        return new Subscription(this);
    }

    public event Action<Message> MessageAvailable;

    private class Subscription : ISubscription {
        // buffer
        private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
            new ConcurrentQueue<Message>());

        private readonly NamedPipeServer _server;

        public Subscription(NamedPipeServer server) {
            // subscribe to event
            _server = server;
            _server.MessageAvailable += OnMessageAvailable;
        }

        public Message NextMessage(TimeSpan? timeout) {
            // this is blocking call
            if (timeout == null)
                return _queue.Take();
            else {
                Message tmp;
                if (_queue.TryTake(out tmp, timeout.Value))
                    return tmp;
                return null;
            }
        }

        private void OnMessageAvailable(Message msg) {
            // add to buffer
            _queue.Add(msg);
        }

        public void Dispose() {
            // clean up
            _server.MessageAvailable -= OnMessageAvailable;
            _queue.CompleteAdding();
            _queue.Dispose();
        }
    }
}

Client then either calls Subscribe or StartAndSubscribe.

var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();

That way if no one subscribes - you buffer no messages. And if someone subscribes and then does not consume - it's their problem, not yours, because buffering happens in subscription they now own. You can also limit size of _queue blocking collection, then adding to it will block if limit is reached, blocking your MessageAvailable event, but I won't recommend doing that.

Upvotes: 1

Related Questions