SOK
SOK

Reputation: 595

Transformation of observable byte array to objects

I'm using Reactive for the first time on a project and I ran into a problem where performance is very important.

Overview:

I'm retrieving a large amount of data via a TCP socket, which I have to parse into objects and insert into a database. Each message has the following signature:

<payload-size> <payload>

Where size is uint32 (4kb) which describes the size of the following payload in bytes.

Problem:

I want to use the functionality which the Reactive Framework provides to parallelize the following steps (seen below) to maximize performance and avoid being the bottleneck. Furthermore, I'm asking for a 'best practices' for implementing this.

TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)

I've already implemented the following code which provides me with an Observable (ArraySegment<byte>).

IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));

I now want to transform the Observable (ArraySegment<byte>) to an Observable (Message). Where my first solution looked kinda like this because I though I could use an observable like a stream.

Read continous bytestream from Stream using TcpClient and Reactive Extensions

Question:

Will it be possible (and how) to create an observable using the following method? Or is there a better approach which you would recommend? I would really appreciate a good example.

Note: The Observable (ArraySegment) behave like a stream, so I do not know the size of the data it pushes to me. (Do I need to implement some kind of buffer or can the Reactive Framework help me?)

    Observable (ArraySegment<byte>) 
    --> Buffer(4kb) 
    --> ReadSize --> Buffer(payload-size) 
    --> ReadPayload 
    --> Parse Payload
    --> (Start over)

Thanks in advance! :)

Upvotes: 3

Views: 1105

Answers (2)

SOK
SOK

Reputation: 595

Here is the solution I ended up using. Feel free to comment with possible improvements.

    public static IObservable<DataMessage> Convert(IObservable<ArraySegment<byte>> bytes)
            {
                const int headerSize = 12; // bytes

                return bytes.Scan(
                    new
                    {
                        Leftovers = new byte[0],
                        Messages = new List<DataMessage>(),
                        Header = (Header) null
                    },
                    (saved, current) =>
                    {
                        var data = ConcatdArrays(saved.Leftovers, current.ToArray());
                        var messages = new List<DataMessage>();
                        var header = saved.Header;

                        while (true)
                        {
                            // Header
                            if (header == null && data.Count >= headerSize)
                            {
                                header = ReadHeader(ref data, headerSize);
                            }

                            // Payload
                            else if (header != null)
                            {
                                var type = header.Type;
                                var size = DataItem.Size(type);

                                if (data.Count < size) break; // Still missing data

                                // Create new message with the gathered data
                                var payload = ReadPayload(ref data, size);
                                messages.Add(new DataMessage(header, payload));
                                header = null;
                            }

                            // Can't do more with the available data - try again next round.
                            else
                            {
                                break;
                            }
                        }

                        return new
                        {
                            Leftovers = data.ToArray(),
                            Messages = messages,
                            Header = header
                        };
                    }).SelectMany(list => list.Messages);

Upvotes: 1

supertopi
supertopi

Reputation: 3488

EDIT: After Dimitri's comments, I present a revised solution below. There is one line in need of desperate refactoring, but it seems to work..

Window overload is used so we can write custom buffering.

var hinge = new Subject<Unit>();

observableSocket
.SelectMany(i => i) // to IObservable<byte> 
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{    
    return
        from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
        from payload in buff.Buffer(size)
        //Refactor line below! Window must be closed somehow..
        from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default)) 
        select payload;                     
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);

EDIT 2: Removed old solution

Upvotes: 1

Related Questions