Matthew Finlay
Matthew Finlay

Reputation: 3464

Using System.Reactive deserialize messages

I currently have a program that listens to a network stream and fires events when a new message has been deserialized.

while(true)
{
  byte[] lengthBytes = new byte[10];
  networkStream.Read(lengthBytes, 0, 10);
  int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
  var messageBytes = new byte[messageLength + 10];
  Array.Copy(lengthBytes, messageBytes, 10);
  int bytesReadTotal = 10;
  while (bytesReadTotal < 10 + messageLength)
    bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
  OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}

I want to rewrite this using the reactive extensions so that instead of the event there is an IObservable<Message>. This could be done using

Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
  (h) => NewMessage += h,
  (h) => NewMessage -= h)
    .Select(  (e) => { return e.Message; });

However I would prefer to rewrite the listening process using System.Reactive instead. My starting point (from here) is

Func<byte[], int, int, IObservable<int>> read;   
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);

which allows

byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
  (bytesRead) => ;
});

I'm struggling to see how to continue though. Does anyone have an implementation?

Upvotes: 2

Views: 320

Answers (2)

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

Since Observable.FromAsyncPattern only makes the async call once, you will need to make a function that will call it multiple times instead. This should get you started, but probably has lots of room for improvement. It assumes that you can make the async calls repeatedly with the same arguments and assumes that the selector will handle any issues that arise from this.

Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
             begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult),
             [end] As Func(Of IAsyncResult, TCallResult),
             selector As Func(Of TCallResult, TResult),
             isComplete As Func(Of TCallResult, Boolean)
            ) As Func(Of T1, T2, T3, IObservable(Of TResult))
    Return Function(a1, a2, a3) Observable.Create(Of TResult)(
        Function(obs)
            Dim serial As New SerialDisposable()
            Dim fac = Observable.FromAsyncPattern(begin, [end])
            Dim onNext As Action(Of TCallResult) = Nothing
            'this function will restart the subscription and will be
            'called every time a value is found
            Dim subscribe As Func(Of IDisposable) =
                Function()
                    'note that we are REUSING the arguments, the
                    'selector should handle this appropriately
                    Return fac(a1, a2, a3).Subscribe(onNext,
                                                     Sub(ex)
                                                         obs.OnError(ex)
                                                         serial.Dispose()
                                                     End Sub)
                End Function
            'set up the OnNext handler to restart the observer 
            'every time it completes
            onNext = Sub(v)
                         obs.OnNext(selector(v))
                         'subscriber disposed, do not check for completion
                         'or resubscribe
                         If serial.IsDisposed Then Exit Sub
                         If isComplete(v) Then
                             obs.OnCompleted()
                             serial.Dispose()
                         Else
                             'using the scheduler lets the OnNext complete before
                             'making the next async call.
                             'you could parameterize the scheduler, but it may not be
                             'helpful, and it won't work if Immediate is passed.
                             Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe())
                         End If
                     End Sub
            'start the first subscription
            serial.Disposable = subscribe()
            Return serial
        End Function)
End Function

From here, you can get an IObservable(Of Byte) like so:

Dim buffer(4096 - 1) As Byte
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
                 AddressOf stream.BeginRead, AddressOf stream.EndRead,
                 Function(numRead)
                     If numRead < 0 Then Throw New ArgumentException("Invalid number read")
                     Console.WriteLine("Position after read: " & stream.Position.ToString())
                     Dim ret(numRead - 1) As Byte
                     Array.Copy(buffer, ret, numRead)
                     Return ret
                 End Function,
                 Function(numRead) numRead <= 0)
'this will be an observable of the chunk size you specify
Dim obs = obsFac(buffer, 0, buffer.Length)

From there, you will need some sort of accumulator function that takes byte arrays and outputs complete messages when they are found. The skeleton of such a function might look like:

Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
    Return Observable.Create(Of message)(
        Function(obs)
            Dim accumulator As New List(Of Byte)
            Return source.Subscribe(
                Sub(buffer)
                    'do some logic to build a packet here
                    accumulator.AddRange(buffer)
                    If True Then
                        obs.OnNext(New message())
                        'reset accumulator
                    End If
                End Sub,
                AddressOf obs.OnError,
                AddressOf obs.OnCompleted)
        End Function)
End Function

Upvotes: 0

Tyson
Tyson

Reputation: 14734

I came up with the following, but I feel it should be possible without creating a class and using Subject<T> (e.g. via some projection of the header packet to the body packet to the message object, but the problem with that is EndRead() doesn't return the byte array, but the number of bytes read. So you need an object or atleast a closure at some point).

class Message
{
    public string Text { get; set; }
}

class MessageStream : IObservable<Message>
{
    private readonly Subject<Message> messages = new Subject<Message>();

    public void Start()
    {
        // Get your real network stream here.
        var stream  = Console.OpenStandardInput();
        GetNextMessage( stream );
    }

    private void GetNextMessage(Stream stream)
    {
        var header = new byte[10];
        var read = Observable.FromAsyncPattern<byte [], int, int, int>( stream.BeginRead, stream.EndRead );
        read( header, 0, 10 ).Subscribe( b =>
        {
            var bodyLength = BitConverter.ToInt32( header, 0 );
            var body = new byte[bodyLength];
            read( body, 0, bodyLength ).Subscribe( b2 =>
            {
                var message = new Message() {Text = Encoding.UTF8.GetString( body )};
                messages.OnNext( message );
                GetNextMessage( stream );
            } );
        } );
    }

    public IDisposable Subscribe( IObserver<Message> observer )
    {
        return messages.Subscribe( observer );
    }
}

Upvotes: 1

Related Questions