Reputation: 3464
I currently have a program that listens to a network stream and fires events when a new message has been deserialized.
while(true)
{
byte[] lengthBytes = new byte[10];
networkStream.Read(lengthBytes, 0, 10);
int messageLength = Int32.Parse(Encoding.UTF8.GetString(lengthBytes));
var messageBytes = new byte[messageLength + 10];
Array.Copy(lengthBytes, messageBytes, 10);
int bytesReadTotal = 10;
while (bytesReadTotal < 10 + messageLength)
bytesReadTotal += networkStream.Read(messageBytes, bytesReadTotal, messageLength - bytesReadTotal + 10);
OnNewMessage(new MessageEventArgs(messageFactory.GetMessage(messageBytes)));
}
I want to rewrite this using the reactive extensions so that instead of the event there is an IObservable<Message>
. This could be done using
Observable.FromEvent<EventHandler<MessageEventArgs>, MessageEventArgs>(
(h) => NewMessage += h,
(h) => NewMessage -= h)
.Select( (e) => { return e.Message; });
However I would prefer to rewrite the listening process using System.Reactive instead. My starting point (from here) is
Func<byte[], int, int, IObservable<int>> read;
read = Observable.FromAsyncPattern<byte[], int, int, int>(
networkStream.BeginRead,
networkStream.EndRead);
which allows
byte[] lengthBytes = new byte[10];
read(lengthBytes, 0, lengthBytes.Length).Subscribe(
{
(bytesRead) => ;
});
I'm struggling to see how to continue though. Does anyone have an implementation?
Upvotes: 2
Views: 320
Reputation: 6155
Since Observable.FromAsyncPattern
only makes the async call once, you will need to make a function that will call it multiple times instead. This should get you started, but probably has lots of room for improvement. It assumes that you can make the async calls repeatedly with the same arguments and assumes that the selector
will handle any issues that arise from this.
Function FromRepeatedAsyncPattern(Of T1, T2, T3, TCallResult, TResult)(
begin As Func(Of T1, T2, T3, AsyncCallback, Object, IAsyncResult),
[end] As Func(Of IAsyncResult, TCallResult),
selector As Func(Of TCallResult, TResult),
isComplete As Func(Of TCallResult, Boolean)
) As Func(Of T1, T2, T3, IObservable(Of TResult))
Return Function(a1, a2, a3) Observable.Create(Of TResult)(
Function(obs)
Dim serial As New SerialDisposable()
Dim fac = Observable.FromAsyncPattern(begin, [end])
Dim onNext As Action(Of TCallResult) = Nothing
'this function will restart the subscription and will be
'called every time a value is found
Dim subscribe As Func(Of IDisposable) =
Function()
'note that we are REUSING the arguments, the
'selector should handle this appropriately
Return fac(a1, a2, a3).Subscribe(onNext,
Sub(ex)
obs.OnError(ex)
serial.Dispose()
End Sub)
End Function
'set up the OnNext handler to restart the observer
'every time it completes
onNext = Sub(v)
obs.OnNext(selector(v))
'subscriber disposed, do not check for completion
'or resubscribe
If serial.IsDisposed Then Exit Sub
If isComplete(v) Then
obs.OnCompleted()
serial.Dispose()
Else
'using the scheduler lets the OnNext complete before
'making the next async call.
'you could parameterize the scheduler, but it may not be
'helpful, and it won't work if Immediate is passed.
Scheduler.CurrentThread.Schedule(Sub() serial.Disposable = subscribe())
End If
End Sub
'start the first subscription
serial.Disposable = subscribe()
Return serial
End Function)
End Function
From here, you can get an IObservable(Of Byte)
like so:
Dim buffer(4096 - 1) As Byte
Dim obsFac = FromRepeatedAsyncPattern(Of Byte(), Integer, Integer, Integer, Byte())(
AddressOf stream.BeginRead, AddressOf stream.EndRead,
Function(numRead)
If numRead < 0 Then Throw New ArgumentException("Invalid number read")
Console.WriteLine("Position after read: " & stream.Position.ToString())
Dim ret(numRead - 1) As Byte
Array.Copy(buffer, ret, numRead)
Return ret
End Function,
Function(numRead) numRead <= 0)
'this will be an observable of the chunk size you specify
Dim obs = obsFac(buffer, 0, buffer.Length)
From there, you will need some sort of accumulator function that takes byte arrays and outputs complete messages when they are found. The skeleton of such a function might look like:
Public Function Accumulate(source As IObservable(Of Byte())) As IObservable(Of Message)
Return Observable.Create(Of message)(
Function(obs)
Dim accumulator As New List(Of Byte)
Return source.Subscribe(
Sub(buffer)
'do some logic to build a packet here
accumulator.AddRange(buffer)
If True Then
obs.OnNext(New message())
'reset accumulator
End If
End Sub,
AddressOf obs.OnError,
AddressOf obs.OnCompleted)
End Function)
End Function
Upvotes: 0
Reputation: 14734
I came up with the following, but I feel it should be possible without creating a class and using Subject<T>
(e.g. via some projection of the header packet to the body packet to the message object, but the problem with that is EndRead()
doesn't return the byte array, but the number of bytes read. So you need an object or atleast a closure at some point).
class Message
{
public string Text { get; set; }
}
class MessageStream : IObservable<Message>
{
private readonly Subject<Message> messages = new Subject<Message>();
public void Start()
{
// Get your real network stream here.
var stream = Console.OpenStandardInput();
GetNextMessage( stream );
}
private void GetNextMessage(Stream stream)
{
var header = new byte[10];
var read = Observable.FromAsyncPattern<byte [], int, int, int>( stream.BeginRead, stream.EndRead );
read( header, 0, 10 ).Subscribe( b =>
{
var bodyLength = BitConverter.ToInt32( header, 0 );
var body = new byte[bodyLength];
read( body, 0, bodyLength ).Subscribe( b2 =>
{
var message = new Message() {Text = Encoding.UTF8.GetString( body )};
messages.OnNext( message );
GetNextMessage( stream );
} );
} );
}
public IDisposable Subscribe( IObserver<Message> observer )
{
return messages.Subscribe( observer );
}
}
Upvotes: 1