RichardWilliams
RichardWilliams

Reputation: 85

Reactive Extensions - De-coupling each onNext call within an observable

Please go easy with me, I am learning this topic also, and am really enjoying it. So here goes...

I create an observable reading from a stream, when enough data comes in and represents a "IMyEvent" type that I need to fire out, I will build that type and call the observer.OnNext.

This stream is a response stream from a server, where commands get sent to this server either from myself or externally, so I can read this stream and react based on this.

For each command I send, I subscribe to the observable made from this stream. I can see if my command has completed successfully. I also subscribe to this stream externally and to react on other events that may occur.

Lets pick an example, lets say someone externally joins the conference of this server, the server will send data down this stream, then my Observable catches this and calls onNext of the observer. I want to react based on this and lock the conference. Edited here... What I mean by locking the environment is that I send a Lock command to the server and the server knows to "lock itself" from allowing anyone else to join the conference.

I have a client that is always subscribing for the "SomeoneJoinsEvent" (they are subscribed to the observable stream) while executing this onNext within this subscription I also fire a command to lock the conference. This command then also subscribes temporarily(I use timeout operator) to the same observable stream, but I notice I am blocked/locked here.

I can see that when I am executing the onNext it doesn't continue reading the stream to watch for more IMyEvents.

So just my ideas/thoughts/brainfarts follow... Is there a special way I can somehow decouple this Observable stream, so it continually reads and fires OnNext to all its subscribers and doesn't wait for them to complete? Or exposing something in the middle that is safely decoupled from the main Observable stream.

  1. I was thinking I could subscribe to this Observable stream internally and create 2 new observables, but I thought I would still be propagating the problem further, when I subscribe to the new observables. Maybe I am wrong here see brainfart 2 and 3.
  2. After reading this I also thought maybe I should be subscribing to this Observable stream on a separate thread and add each IMyEvent to 2 queues maybe this then decoupling it and allowing me to subscribe to each queue separately, one queue subscription for my internal calls that wait for commands to complete, and one subscription for my external calls, somehow this still feels wrong too, what if I have many external subscribers to this Observable stream and one of them blocks?
  3. I think I am a little confused here, now I am thinking that my main Observable stream is .Publish.RefCount, is all I need to do is to create a new observable each time I want to receive IMyEvent (this being the same as creating a queue, like brain fart 2), that subscribe to the main connectable observable? So the internal commands subscribe to 1 observable waiting for the correct event, and 1 observable is exposed externally, I haven't thought about or tried this one yet.

Is it possible someone could help me here? I hope this makes sense what I am trying to do.

Here is an example of my code...

//This is the observable that I create from the stream of events
public IObservable<IMyEvent> MyEvents()
{
    if (_myEventObservable != null)
    {
        return _myEventObservable;
    }

    // Here is one observable that reads data as it comes in off the asynchronous stream.
    _myEventObservable = Observable.Create<IMyEvent>(async observer =>
    {
        var myServerEventStream = await GetStreamFromMyServer(_myAuthenticationConfiguration, _token, _httpClientWrapper);
        var streamReaderAsyncState = _streamReaderAsyncStateFactory.CreateStreamReadAsyncState(myServerEventStream, 255);

        var currentStateDisposable = new SerialDisposable();
        var iterator = new Action<IStreamReaderAsyncState, Action<IStreamReaderAsyncState>>((state, self) =>
        {
            try
            {
                currentStateDisposable.Disposable = state.StreamObservable().Subscribe(
                    //OnNext
                    bytesRead =>
                    {
                        if (bytesRead == 0)
                        {
                            observer.OnCompleted();
                        }
                        else
                        {
                            //In here it does all the magic to put all the data together and only call OnNext on the observer when there is a complete IMyEvent.
                            //It is just a plain observer.OnNext(IMyEvent whatever event we have build up)
                            _messageParser.Parse(new DataChunk(state.Buffer, bytesRead), _token, observer);
                            self(state);
                        }
                    });
            }
            catch (Exception e)
            {
                observer.OnError(e);
            }
        });

        var schedulerDisposable = TaskPoolScheduler.Default.Schedule(streamReaderAsyncState, iterator);
        return new CompositeDisposable(myServerEventStream, schedulerDisposable, currentStateDisposable);
    }).Publish().RefCount();

    return _myEventObservable;
}

//Just for fuller visibility on the stream reader async state, here is the class that is returned from the "_streamReaderAsyncStateFactory.CreateStreamReadAsyncState(myServerEventStream, 255)" call   
internal class StreamReaderAsyncState : IStreamReaderAsyncState
{
    private readonly IObservable<int> _readAsyncObservable;

    public StreamReaderAsyncState(Stream stream, int bufferSize)
    {
        Buffer = new byte[bufferSize];
        _readAsyncObservable = Observable.FromAsync(() => stream.ReadAsync(Buffer, 0, bufferSize));
    }

    public byte[] Buffer { get; private set; }

    public IObservable<int> StreamObservable()
    {
        return _readAsyncObservable;
    }
}


//Externally I subscribe to this like so...
MyEvents.OfType<SomoneJoinsEvent>
.Subscribe(
    //I read somewhere that I shouldn't be making this async and using a selectmany with the async, but I am unsure.
    async myEvent => {
        await LockEnvironment()
    }
)

//The LockEnvironment call
//The myCommandPost is the LockEnvironment Command that is passed in.
private async Task<CommandResponse<TCommandResponseDto>> PostCommandAndWaitForEvent<TEventToWaitFor, TCommandResponseDto>(IMyCommandPost myCommandPost)
    where TEventToWaitFor : IMyEvent
{
    //So my intention here is to zip together the result of the post command with the TEventToWaitFor and return the first one. Otherwise if it takes too long it will return the result of the Timeout operator.
    return await MyEvents()
                    .OfType<TEventToWaitFor>()
                    //This myCommandPost.PostCommand... returns a Task<CommandResponse<TCommandResponseDto>>
                    .Zip(myCommandPost.PostCommand<TCommandResponseDto>().ToObservable(), (myEvent, myCommandResponse) => myCommandResponse)
                    .Timeout(new TimeSpan(0, 0, _myAuthenticationConfiguration.TimeoutToWaitForCommands), TimedOutLookingForMyEvent<TCommandResponseDto>())
                    .FirstAsync();
}

//The timeout observable
private IObservable<CommandResponse<TCommandResponseDto>> TimedOutLookingForMyEvent<TCommandResponseDto>()
{
    return Observable.Create<CommandResponse<TCommandResponseDto>>(observable =>
    {
        observable.OnNext(new CommandResponse<TCommandResponseDto>());
        return Disposable.Empty;
    });
}

Also edited here, adding what I did for the event parser...

internal class MyEventParser : IMessageParser
{
    private readonly ILogService _logService;
    private readonly IMyEventFactory _MyEventFactory;
    private readonly StringBuilder _data = new StringBuilder();
    private const string EventDelimiter = "\n\n";
    private readonly Regex _eventDelimiterRegex = new Regex("\n{3,}");

    public MyEventParser(ILogService logService, IMyEventFactory myEventFactory)
    {
        _logService = logService;
        _myEventFactory = myEventFactory;
    }

    public void Parse(DataChunk dataChunk, string token, IObserver<IMyEvent> myEventObserver)
    {
        _data.Append(dataChunk);
        CleanUpEventDelimiterInData();
        var numberOfSubstrings = CountNumberOfSubstrings(EventDelimiter, _data.ToString());
        if (numberOfSubstrings == 0)
        {
            return;
        }

        var events = _data.ToString().Split(new[]{EventDelimiter}, StringSplitOptions.RemoveEmptyEntries);

        events.Take(numberOfSubstrings).Foreach(x =>
        {
            _logService.InfoFormat("MyEventParser - {0} - OnNext: \n\n{1}\n\n", token.Substring(token.Length -10), x);
            myEventObserver.OnNext(_myEventFactory.Create(x));
        });

        //Clean up data of what has already been fired.
        if (events.Count() == numberOfSubstrings)
        {
            _data.Clear();
        }
        else
        {
            _data.Clear();
            _data.Append(events.Last());
        }
    }

    private void CleanUpEventDelimiterInData()
    {
        var eventDelimitersFixed = _eventDelimiterRegex.Replace(_data.ToString(), EventDelimiter);
        _data.Clear();
        _data.Append(eventDelimitersFixed);
    }

    private int CountNumberOfSubstrings(string subString, string source)
    {
        var i = 0;
        var count = 0;
        while ((i = source.IndexOf(subString, i, StringComparison.InvariantCulture)) != -1)
        {
            i += subString.Length;
            count++;
        }

        return count;
    }
}

Thanks for all your help in advance :-)

Upvotes: 1

Views: 2155

Answers (1)

Lee Campbell
Lee Campbell

Reputation: 10783

Firstly, welcome to Rx and reactive programming. It can be confusing at first, but getting the fundamental right will make it exponentially easier.

First I want to quickly go over your code

public IObservable<IMyEvent> MyEvents()
{
    if (_myEventObservable != null)
    {
        return _myEventObservable;
    }

This looks like it should a property with a private readonly backing field. Observable sequences are lazily evaluated, so if no one subscribes none of the code will run.

what if I have many external subscribers to this Observable stream and one of them blocks?

Well then you have a badly-behaved observer. Subscribers to Rx Observable sequences should process their callbacks as fast as possible. This means no locking, no IO, no CPU intensive processing. If you need to do this, then maybe you need to queue a message and do that work somewhere/sometime else.

It also seems that StreamReaderAsyncState doesn't do enough, and that MyEvents does too much. Maybe if you had something like this (modified from https://github.com/LeeCampbell/RxCookbook/tree/master/IO/Disk)

public static class ObservableStreamExtensions
{
    public static IObservable<byte[]> ToObservable(this Stream source, int bufferSize)
    {
        return Observable.Create<byte[]>(async (o, cts) =>
        {
            var buffer = new byte[bufferSize];
            var bytesRead = 0;
            do 
            {
                try
                {
                    bytesRead = await source.ReadAsync(buffer, 0, bufferSize);
                    if (bytesRead > 0)
                    {
                        var output = new byte[bytesRead];
                        Array.Copy(buffer, output, bytesRead);
                        o.OnNext(output);   
                    }
                }
                catch (Exception e)
                {
                    o.OnError(e);
                }
            }
            while (!cts.IsCancellationRequested && bytesRead > 0);

            if (!cts.IsCancellationRequested && bytesRead == 0)
            {
                o.OnCompleted();
            }
        });
    }
}

Then your code is reduced to

private readonly IObservable<IMyEvent> _myEvents;
public ctor()
{
    _myEvents = return Observable.Create<IMyEvent>(async observer =>
    {
        var bufferSize = 255;
        var myServerEventStream = await GetStreamFromMyServer(_pexipAuthenticationConfiguration, _token, _httpClientWrapper);

        var subscription = myServerEventStream
            .ToObservable(bufferSize)
            .Select(buffer=>new DataChunk(buffer, _token))
            .Subscribe(observer);

        return new CompositeDisposable(myServerEventStream, subscription);
    })
    .Publish()
    .RefCount();
}

//This is the observable that I create from the stream of events
public IObservable<IMyEvent> MyData{ get { return _myEvents; } }

Note that new DataChunk now doesn't take an IObserver which to me looks like a code smell. Also note that you use the term "magic" in your comments shudder. None of this should be magic. Just simply a composition of subscribers and transformations.

Next we look at what seems to be the root of your problem: locking. I don't know what "locking the environment" means, you don't appear to explain or justify it.

Perhaps what you want to do is to have the concept of an asynchronous gate. This where you simply set a flag to specify the state you are in. Ideally this flag is also observable. Then instead of locking the system, you can compose other incoming events/commands/messages with the state of that flag.

See the video where Matt talks about async gates here (go to time 00:18:00): https://yow.eventer.com/yow-2014-1222/event-driven-user-interfaces-by-matt-barrett-and-lee-campbell-1686

Back to what I believe to be your actual problem, is that I think you have created a dead lock.

MyEvents.OfType<SomoneJoinsEvent>
.Subscribe(
    //I read somewhere that I shouldn't be making this async and using a selectmany with the async, but I am unsure.
    async myEvent => {
        await LockEnvironment()
    }
)

This code is actively blocking the producer from calling back. Rx is a free threaded model, so it is actually very simple (in theory) what it is doing under the hood. When OnNext is called, it simply loop through each subscriber and calls the its callback. Here you are blocking, so not only can the producer not call the next subscriber, it cant process the next message.

So you are listening to the MyEvents sequence, you get a SomoneJoinsEvent as say the 100th message. You then try to then push a command that will produce an event in MyEvents. When this event is received, you will continue. However, you are blocking that message from getting received. Thus you are in a deadlock.

So now the question is back to you, what do you want this LockEnvironment to actually achieve?

EDIT:

Looking at your code, it seems overly complex. You seem to hitting StringBuilder a lot, adding, querying mutating and replacing its contents.

I think you can just use a more generalized solution to the problem.

Here is an example of a how you could process a stream being read into buffers and then projected/translated to records.

var testScheduler = new TestScheduler();

//A series of bytes/chars to be treated as buffer read from a stream (10 at a time).
//  a \n\n represents a record delimiter.
var source = testScheduler.CreateColdObservable<char[]>(
    ReactiveTest.OnNext(0100, new char[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', '\n', '\n', 'h' }),
    ReactiveTest.OnNext(0200, new char[] { 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', '\n', '\n' }),
    ReactiveTest.OnNext(0300, new char[] { 'q', 'r', 's', '\n', '\n', 't', 'u', 'v', '\n', '\n' }),
    ReactiveTest.OnNext(0400, new char[] { 'w', 'x', '\n', 'y', 'z', '\n', '\n' })
);

var delimiter = '\n';
var observer = testScheduler.CreateObserver<string>();
var shared = source.SelectMany(buffer=>buffer).Publish().RefCount();
var subscription = shared
    //Where we see two '\n' values then emit the buffered values
    .Buffer(() => shared.Scan(Tuple.Create(' ',' '), (acc, cur)=>Tuple.Create(acc.Item2, cur)).Where(t => t.Item1 == delimiter && t.Item2==delimiter))
    //Remove trailing delimiters
    .Select(chunk =>
        {
            var len = chunk.Count;
            while(chunk[chunk.Count-1]==delimiter)
            {
                chunk.RemoveAt(chunk.Count-1);
            }
            return chunk;
        })
    //Filter out empty buffers
    .Where(chunk=>chunk.Any())
    //Translate the record to the desired output type
    .Select(chunk=>new string(chunk.ToArray()))
    .Subscribe(observer);

testScheduler.Start();
observer.Messages.AssertEqual(
    ReactiveTest.OnNext(0100, "abcdefg"),
    ReactiveTest.OnNext(0200, "hijklmnop"),
    ReactiveTest.OnNext(0300, "qrs"),
    ReactiveTest.OnNext(0300, "tuv"),
    ReactiveTest.OnNext(0400, "wx\nyz")
);

Upvotes: 3

Related Questions