Reputation: 595
I'm using Reactive for the first time on a project and I ran into a problem where performance is very important.
I'm retrieving a large amount of data via a TCP socket, which I have to parse into objects and insert into a database. Each message has the following signature:
<payload-size> <payload>
Where size is uint32 (4kb) which describes the size of the following payload in bytes.
I want to use the functionality which the Reactive Framework provides to parallelize the following steps (seen below) to maximize performance and avoid being the bottleneck. Furthermore, I'm asking for a 'best practices' for implementing this.
TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)
I've already implemented the following code which provides me with an Observable (ArraySegment<byte>)
.
IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));
I now want to transform the Observable (ArraySegment<byte>)
to an Observable (Message)
. Where my first solution looked kinda like this because I though I could use an observable like a stream.
Read continous bytestream from Stream using TcpClient and Reactive Extensions
Will it be possible (and how) to create an observable using the following method? Or is there a better approach which you would recommend? I would really appreciate a good example.
Note: The Observable (ArraySegment) behave like a stream, so I do not know the size of the data it pushes to me. (Do I need to implement some kind of buffer or can the Reactive Framework help me?)
Observable (ArraySegment<byte>)
--> Buffer(4kb)
--> ReadSize --> Buffer(payload-size)
--> ReadPayload
--> Parse Payload
--> (Start over)
Thanks in advance! :)
Upvotes: 3
Views: 1105
Reputation: 595
Here is the solution I ended up using. Feel free to comment with possible improvements.
public static IObservable<DataMessage> Convert(IObservable<ArraySegment<byte>> bytes)
{
const int headerSize = 12; // bytes
return bytes.Scan(
new
{
Leftovers = new byte[0],
Messages = new List<DataMessage>(),
Header = (Header) null
},
(saved, current) =>
{
var data = ConcatdArrays(saved.Leftovers, current.ToArray());
var messages = new List<DataMessage>();
var header = saved.Header;
while (true)
{
// Header
if (header == null && data.Count >= headerSize)
{
header = ReadHeader(ref data, headerSize);
}
// Payload
else if (header != null)
{
var type = header.Type;
var size = DataItem.Size(type);
if (data.Count < size) break; // Still missing data
// Create new message with the gathered data
var payload = ReadPayload(ref data, size);
messages.Add(new DataMessage(header, payload));
header = null;
}
// Can't do more with the available data - try again next round.
else
{
break;
}
}
return new
{
Leftovers = data.ToArray(),
Messages = messages,
Header = header
};
}).SelectMany(list => list.Messages);
Upvotes: 1
Reputation: 3488
EDIT: After Dimitri's comments, I present a revised solution below. There is one line in need of desperate refactoring, but it seems to work..
Window
overload is used so we can write custom buffering.
var hinge = new Subject<Unit>();
observableSocket
.SelectMany(i => i) // to IObservable<byte>
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{
return
from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
from payload in buff.Buffer(size)
//Refactor line below! Window must be closed somehow..
from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default))
select payload;
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);
EDIT 2: Removed old solution
Upvotes: 1