Reputation: 85
Please go easy with me, I am learning this topic also, and am really enjoying it. So here goes...
I create an observable reading from a stream, when enough data comes in and represents a "IMyEvent" type that I need to fire out, I will build that type and call the observer.OnNext.
This stream is a response stream from a server, where commands get sent to this server either from myself or externally, so I can read this stream and react based on this.
For each command I send, I subscribe to the observable made from this stream. I can see if my command has completed successfully. I also subscribe to this stream externally and to react on other events that may occur.
Lets pick an example, lets say someone externally joins the conference of this server, the server will send data down this stream, then my Observable catches this and calls onNext of the observer. I want to react based on this and lock the conference. Edited here... What I mean by locking the environment is that I send a Lock command to the server and the server knows to "lock itself" from allowing anyone else to join the conference.
I have a client that is always subscribing for the "SomeoneJoinsEvent" (they are subscribed to the observable stream) while executing this onNext within this subscription I also fire a command to lock the conference. This command then also subscribes temporarily(I use timeout operator) to the same observable stream, but I notice I am blocked/locked here.
I can see that when I am executing the onNext it doesn't continue reading the stream to watch for more IMyEvents.
So just my ideas/thoughts/brainfarts follow... Is there a special way I can somehow decouple this Observable stream, so it continually reads and fires OnNext to all its subscribers and doesn't wait for them to complete? Or exposing something in the middle that is safely decoupled from the main Observable stream.
Is it possible someone could help me here? I hope this makes sense what I am trying to do.
Here is an example of my code...
//This is the observable that I create from the stream of events
public IObservable<IMyEvent> MyEvents()
{
if (_myEventObservable != null)
{
return _myEventObservable;
}
// Here is one observable that reads data as it comes in off the asynchronous stream.
_myEventObservable = Observable.Create<IMyEvent>(async observer =>
{
var myServerEventStream = await GetStreamFromMyServer(_myAuthenticationConfiguration, _token, _httpClientWrapper);
var streamReaderAsyncState = _streamReaderAsyncStateFactory.CreateStreamReadAsyncState(myServerEventStream, 255);
var currentStateDisposable = new SerialDisposable();
var iterator = new Action<IStreamReaderAsyncState, Action<IStreamReaderAsyncState>>((state, self) =>
{
try
{
currentStateDisposable.Disposable = state.StreamObservable().Subscribe(
//OnNext
bytesRead =>
{
if (bytesRead == 0)
{
observer.OnCompleted();
}
else
{
//In here it does all the magic to put all the data together and only call OnNext on the observer when there is a complete IMyEvent.
//It is just a plain observer.OnNext(IMyEvent whatever event we have build up)
_messageParser.Parse(new DataChunk(state.Buffer, bytesRead), _token, observer);
self(state);
}
});
}
catch (Exception e)
{
observer.OnError(e);
}
});
var schedulerDisposable = TaskPoolScheduler.Default.Schedule(streamReaderAsyncState, iterator);
return new CompositeDisposable(myServerEventStream, schedulerDisposable, currentStateDisposable);
}).Publish().RefCount();
return _myEventObservable;
}
//Just for fuller visibility on the stream reader async state, here is the class that is returned from the "_streamReaderAsyncStateFactory.CreateStreamReadAsyncState(myServerEventStream, 255)" call
internal class StreamReaderAsyncState : IStreamReaderAsyncState
{
private readonly IObservable<int> _readAsyncObservable;
public StreamReaderAsyncState(Stream stream, int bufferSize)
{
Buffer = new byte[bufferSize];
_readAsyncObservable = Observable.FromAsync(() => stream.ReadAsync(Buffer, 0, bufferSize));
}
public byte[] Buffer { get; private set; }
public IObservable<int> StreamObservable()
{
return _readAsyncObservable;
}
}
//Externally I subscribe to this like so...
MyEvents.OfType<SomoneJoinsEvent>
.Subscribe(
//I read somewhere that I shouldn't be making this async and using a selectmany with the async, but I am unsure.
async myEvent => {
await LockEnvironment()
}
)
//The LockEnvironment call
//The myCommandPost is the LockEnvironment Command that is passed in.
private async Task<CommandResponse<TCommandResponseDto>> PostCommandAndWaitForEvent<TEventToWaitFor, TCommandResponseDto>(IMyCommandPost myCommandPost)
where TEventToWaitFor : IMyEvent
{
//So my intention here is to zip together the result of the post command with the TEventToWaitFor and return the first one. Otherwise if it takes too long it will return the result of the Timeout operator.
return await MyEvents()
.OfType<TEventToWaitFor>()
//This myCommandPost.PostCommand... returns a Task<CommandResponse<TCommandResponseDto>>
.Zip(myCommandPost.PostCommand<TCommandResponseDto>().ToObservable(), (myEvent, myCommandResponse) => myCommandResponse)
.Timeout(new TimeSpan(0, 0, _myAuthenticationConfiguration.TimeoutToWaitForCommands), TimedOutLookingForMyEvent<TCommandResponseDto>())
.FirstAsync();
}
//The timeout observable
private IObservable<CommandResponse<TCommandResponseDto>> TimedOutLookingForMyEvent<TCommandResponseDto>()
{
return Observable.Create<CommandResponse<TCommandResponseDto>>(observable =>
{
observable.OnNext(new CommandResponse<TCommandResponseDto>());
return Disposable.Empty;
});
}
Also edited here, adding what I did for the event parser...
internal class MyEventParser : IMessageParser
{
private readonly ILogService _logService;
private readonly IMyEventFactory _MyEventFactory;
private readonly StringBuilder _data = new StringBuilder();
private const string EventDelimiter = "\n\n";
private readonly Regex _eventDelimiterRegex = new Regex("\n{3,}");
public MyEventParser(ILogService logService, IMyEventFactory myEventFactory)
{
_logService = logService;
_myEventFactory = myEventFactory;
}
public void Parse(DataChunk dataChunk, string token, IObserver<IMyEvent> myEventObserver)
{
_data.Append(dataChunk);
CleanUpEventDelimiterInData();
var numberOfSubstrings = CountNumberOfSubstrings(EventDelimiter, _data.ToString());
if (numberOfSubstrings == 0)
{
return;
}
var events = _data.ToString().Split(new[]{EventDelimiter}, StringSplitOptions.RemoveEmptyEntries);
events.Take(numberOfSubstrings).Foreach(x =>
{
_logService.InfoFormat("MyEventParser - {0} - OnNext: \n\n{1}\n\n", token.Substring(token.Length -10), x);
myEventObserver.OnNext(_myEventFactory.Create(x));
});
//Clean up data of what has already been fired.
if (events.Count() == numberOfSubstrings)
{
_data.Clear();
}
else
{
_data.Clear();
_data.Append(events.Last());
}
}
private void CleanUpEventDelimiterInData()
{
var eventDelimitersFixed = _eventDelimiterRegex.Replace(_data.ToString(), EventDelimiter);
_data.Clear();
_data.Append(eventDelimitersFixed);
}
private int CountNumberOfSubstrings(string subString, string source)
{
var i = 0;
var count = 0;
while ((i = source.IndexOf(subString, i, StringComparison.InvariantCulture)) != -1)
{
i += subString.Length;
count++;
}
return count;
}
}
Thanks for all your help in advance :-)
Upvotes: 1
Views: 2155
Reputation: 10783
Firstly, welcome to Rx and reactive programming. It can be confusing at first, but getting the fundamental right will make it exponentially easier.
First I want to quickly go over your code
public IObservable<IMyEvent> MyEvents()
{
if (_myEventObservable != null)
{
return _myEventObservable;
}
This looks like it should a property with a private readonly
backing field.
Observable sequences are lazily evaluated, so if no one subscribes none of the code will run.
what if I have many external subscribers to this Observable stream and one of them blocks?
Well then you have a badly-behaved observer. Subscribers to Rx Observable sequences should process their callbacks as fast as possible. This means no locking, no IO, no CPU intensive processing. If you need to do this, then maybe you need to queue a message and do that work somewhere/sometime else.
It also seems that StreamReaderAsyncState
doesn't do enough, and that MyEvents
does too much.
Maybe if you had something like this (modified from https://github.com/LeeCampbell/RxCookbook/tree/master/IO/Disk)
public static class ObservableStreamExtensions
{
public static IObservable<byte[]> ToObservable(this Stream source, int bufferSize)
{
return Observable.Create<byte[]>(async (o, cts) =>
{
var buffer = new byte[bufferSize];
var bytesRead = 0;
do
{
try
{
bytesRead = await source.ReadAsync(buffer, 0, bufferSize);
if (bytesRead > 0)
{
var output = new byte[bytesRead];
Array.Copy(buffer, output, bytesRead);
o.OnNext(output);
}
}
catch (Exception e)
{
o.OnError(e);
}
}
while (!cts.IsCancellationRequested && bytesRead > 0);
if (!cts.IsCancellationRequested && bytesRead == 0)
{
o.OnCompleted();
}
});
}
}
Then your code is reduced to
private readonly IObservable<IMyEvent> _myEvents;
public ctor()
{
_myEvents = return Observable.Create<IMyEvent>(async observer =>
{
var bufferSize = 255;
var myServerEventStream = await GetStreamFromMyServer(_pexipAuthenticationConfiguration, _token, _httpClientWrapper);
var subscription = myServerEventStream
.ToObservable(bufferSize)
.Select(buffer=>new DataChunk(buffer, _token))
.Subscribe(observer);
return new CompositeDisposable(myServerEventStream, subscription);
})
.Publish()
.RefCount();
}
//This is the observable that I create from the stream of events
public IObservable<IMyEvent> MyData{ get { return _myEvents; } }
Note that new DataChunk
now doesn't take an IObserver
which to me looks like a code smell.
Also note that you use the term "magic" in your comments shudder.
None of this should be magic.
Just simply a composition of subscribers and transformations.
Next we look at what seems to be the root of your problem: locking. I don't know what "locking the environment" means, you don't appear to explain or justify it.
Perhaps what you want to do is to have the concept of an asynchronous gate. This where you simply set a flag to specify the state you are in. Ideally this flag is also observable. Then instead of locking the system, you can compose other incoming events/commands/messages with the state of that flag.
See the video where Matt talks about async gates here (go to time 00:18:00): https://yow.eventer.com/yow-2014-1222/event-driven-user-interfaces-by-matt-barrett-and-lee-campbell-1686
Back to what I believe to be your actual problem, is that I think you have created a dead lock.
MyEvents.OfType<SomoneJoinsEvent>
.Subscribe(
//I read somewhere that I shouldn't be making this async and using a selectmany with the async, but I am unsure.
async myEvent => {
await LockEnvironment()
}
)
This code is actively blocking the producer from calling back. Rx is a free threaded model, so it is actually very simple (in theory) what it is doing under the hood. When OnNext is called, it simply loop through each subscriber and calls the its callback. Here you are blocking, so not only can the producer not call the next subscriber, it cant process the next message.
So you are listening to the MyEvents
sequence, you get a SomoneJoinsEvent
as say the 100th message.
You then try to then push a command that will produce an event in MyEvents
.
When this event is received, you will continue.
However, you are blocking that message from getting received.
Thus you are in a deadlock.
So now the question is back to you, what do you want this LockEnvironment to actually achieve?
EDIT:
Looking at your code, it seems overly complex. You seem to hitting StringBuilder a lot, adding, querying mutating and replacing its contents.
I think you can just use a more generalized solution to the problem.
Here is an example of a how you could process a stream being read into buffers and then projected/translated to records.
var testScheduler = new TestScheduler();
//A series of bytes/chars to be treated as buffer read from a stream (10 at a time).
// a \n\n represents a record delimiter.
var source = testScheduler.CreateColdObservable<char[]>(
ReactiveTest.OnNext(0100, new char[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', '\n', '\n', 'h' }),
ReactiveTest.OnNext(0200, new char[] { 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', '\n', '\n' }),
ReactiveTest.OnNext(0300, new char[] { 'q', 'r', 's', '\n', '\n', 't', 'u', 'v', '\n', '\n' }),
ReactiveTest.OnNext(0400, new char[] { 'w', 'x', '\n', 'y', 'z', '\n', '\n' })
);
var delimiter = '\n';
var observer = testScheduler.CreateObserver<string>();
var shared = source.SelectMany(buffer=>buffer).Publish().RefCount();
var subscription = shared
//Where we see two '\n' values then emit the buffered values
.Buffer(() => shared.Scan(Tuple.Create(' ',' '), (acc, cur)=>Tuple.Create(acc.Item2, cur)).Where(t => t.Item1 == delimiter && t.Item2==delimiter))
//Remove trailing delimiters
.Select(chunk =>
{
var len = chunk.Count;
while(chunk[chunk.Count-1]==delimiter)
{
chunk.RemoveAt(chunk.Count-1);
}
return chunk;
})
//Filter out empty buffers
.Where(chunk=>chunk.Any())
//Translate the record to the desired output type
.Select(chunk=>new string(chunk.ToArray()))
.Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
ReactiveTest.OnNext(0100, "abcdefg"),
ReactiveTest.OnNext(0200, "hijklmnop"),
ReactiveTest.OnNext(0300, "qrs"),
ReactiveTest.OnNext(0300, "tuv"),
ReactiveTest.OnNext(0400, "wx\nyz")
);
Upvotes: 3