joelmdev
joelmdev

Reputation: 11773

Reactive Extensions - Deserializing stream from IObservable<byte[]> into individual delimited messages without the use of a Subject

I am taking the messages pumped to me from an IObservable<byte[]> and deserializing these into strings, which are then pumped out via an IObservable<string>. A Socket is populating the IObservable<byte[]> messages via a FromEventPattern conversion. The deserialized messages from the Socket are linefeed delimited strings. Since a single message received from the Socket is not required to be a single delimited string (it could be any portion of any number of messages, and partial messages are possible). The first way to solve this that came to mind was with a Subject and a closure like so:

private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes)
{
    const byte byteLineFeed = 10;
    var subject = new Subject<string>();
    byte[] leftovers = null;

    bytes.Subscribe(current =>
    {
        var lastPositionOfLineFeed = -1;
        for (var i = 0; i < current.Length; i++)
        {
            if (current[i] == byteLineFeed)
            {
                if (leftovers != null)
                {
                    subject.OnNext(
                        Encoding.ASCII.GetString(
                            leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                                i - lastPositionOfLineFeed))
                                        .ToArray()));
                    leftovers = null;
                }
                else
                {
                    subject.OnNext(
                        Encoding.ASCII.GetString(
                            current.Slice(lastPositionOfLineFeed + 1,
                                i - lastPositionOfLineFeed)));
                }
                lastPositionOfLineFeed = i;
            }
        }
        if (lastPositionOfLineFeed != current.Length - 1)
        {
            if (leftovers != null)
            {
                leftovers = leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                    current.Length - lastPositionOfLineFeed - 1))
                                        .ToArray();
            }
            else
            {
                leftovers = current.Slice(lastPositionOfLineFeed + 1,
                    current.Length - lastPositionOfLineFeed - 1);
            }
        }
    });

    return subject.AsObservable();
}

This works well, but I know that the use of Subjects is frowned upon for a variety of reasons, some of which are featured in this block of code. I feel like I might be reinventing the wheel here as I'm not thoroughly familiar with all of the methods in Rx. Could I do this without a closure and a Subject? If so, how would I go about it? Or does use of a Subject here make sense?

Upvotes: 6

Views: 1239

Answers (5)

Dave Sexton
Dave Sexton

Reputation: 2652

Here's a complete lab showing how to write a simple test client/host scenario using Rxx.

You may be most interested in the ReceiveData method at the end, because it directly addresses your question using Rxx Parsers. Notice that I'm flattening the sequence of byte[] first using SelectMany to create an IObservable<byte>, which is then fed to a simple binary parser grammar that chops it up by newline characters and projects lines of bytes as decoded strings.

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Rxx.Parsers.Reactive.Linq;

namespace Rxx.Labs.Forums
{
  public sealed class BinaryParserLab : BaseConsoleLab
  {
    private const int port = 30253;
    private const int bufferSize = 4;
    private static Encoding encoding = Encoding.ASCII;

    protected override void Main()
    {
      using (new CompositeDisposable(StartHost(), StartClient()))
      {
        WaitForKey();
      }
    }

    private IDisposable StartClient()
    {
      return Observable.Using(
        () => new TcpClient(),
        client => (from _ in client.ConnectObservable(IPAddress.Loopback, port)
                   from line in Observable.Using(() => client.GetStream(), ReceiveData)
                   select line))
                   .Subscribe(line => TraceSuccess("Received: {0}", line), () => TraceStatus("Client Completed."));
    }

    private IDisposable StartHost()
    {
      return (from client in ObservableTcpListener.Start(IPAddress.Loopback, port, maxConcurrent: 1).Take(1)
              from _ in Observable.Using(
               () => client.GetStream(),
               stream => Observable.Create<Unit>((observer, cancel) => SendData(stream, observer, cancel)))
              select Unit.Default)
              .Subscribe(_ => { }, () => TraceStatus("Host Completed."));
    }

    private async Task SendData(NetworkStream stream, IObserver<Unit> observer, CancellationToken cancel)
    {
      var data = encoding.GetBytes("Line 1\nLine 2\nLine 3\nLine 4\n");

      for (var i = 0; i < data.Length; i += bufferSize)
      {
        TraceLine("Sending: {0}", encoding.GetString(data, i, bufferSize).Replace('\n', ' '));

        await stream.WriteAsync(data, i, bufferSize, cancel).ConfigureAwait(false);
        await Task.Delay(TimeSpan.FromSeconds(1), cancel).ConfigureAwait(false);
      }
    }

    private IObservable<string> ReceiveData(NetworkStream stream)
    {
      return (from bytes in stream.ReadToEndObservable(bufferSize)
              from b in bytes
              select b)
              .ParseBinary(parser =>
              from next in parser
              let newLine = parser.Byte.Of((byte)'\n')
              let line = next.Not(newLine).NoneOrMore()
              select from bytes in line
                     from _ in newLine
                     from array in bytes.ToArray()
                     select encoding.GetString(array));
    }
  }
}

Output:

Starting BinaryParser Lab...

Sending: Line
Sending:  1 L
Received: Line 1
Sending: ine 
Sending: 2 Li
Received: Line 2
Sending: ne 3
Sending:  Lin
Received: Line 3
Sending: e 4 
Received: Line 4
Host Completed.
Client Completed.

BinaryParser Lab Completed.

Upvotes: 2

James World
James World

Reputation: 29776

Ignoring the probably superior use of BCL provided constructs to create a TextReader, if that can be made to fit your scenario, I wondered about how I would do this in an Rx idiomatic way and came up with the following short query that avoids custom operators and subjects and all that:

var messages =
    arrayStream.Select(bytes => bytes.ToObservable()).Concat()
               .Publish(ps => ps.Where(p => p != 10)
                                .Buffer(() => ps.Where(p => p == 10)))
               .Select(ls => Encoding.ASCII.GetString(ls.ToArray()));

Breaking it down

I assume ASCII encoding (as you had in your question) in order to assume a byte value 10 is a line delimiter - with a multi-byte encoding scheme this would be naïve and a more complex framing algorithm is required (one of the reasons why leaning on BCL provided stream infrastructure is probably better).

So, assuming a stream of byte arrays IObservable<byte[]> arrayStream then we can flatten to a stream of IObservable<byte> like this:

arrayStream.Select(bytes => bytes.ToObservable()).Concat()

This uses Select + Concat rather than SelectMany in order to guarantee that the bytes are streamed out in strict order. I edited this in on a hunch - I've not analyzed the code to be sufficiently comfortable that there isn't a possibility of a subsequent array overlapping with a previous one without this extra guard - and I think it will come down to the scheduler being used. If you are interested, check the Rx source here. Anyway, better to be safe. This done, we can buffer up a list of line delimited bytes like this:

.Pubish(ps => ps.Where(p => p != 10)
                .Buffer(() => ps.Where(p => p == 10)))

Here we are publishing because we will subscribe to the byte stream twice, so we must multicast it. We buffer on a byte stream stripped of line feeds, and supply a buffer closing function that watches for line feeds. This emits lists of bytes that have been delimited.

Finally, we decode the messages with a simple projection:

.Select(ls => Encoding.UTF8.GetString(ls.ToArray()));

Here's a fully working example demonstrating some messages whose framing is split between several packets:

var rawMessages = new byte[][] {
    Encoding.ASCII.GetBytes("This\ni"),
    Encoding.ASCII.GetBytes("s\na"),
    Encoding.ASCII.GetBytes("\ntest!\n")
};

var arrayStream = new Subject<byte[]>();

var messages =
    arrayStream.Select(bytes => bytes.ToObservable()).Concat()
               .Publish(ps => ps.Where(p => p != 10)
                                .Buffer(() => ps.Where(p => p == 10)))
               .Select(ls => Encoding.ASCII.GetString(ls.ToArray())); 

messages.Subscribe(b => Console.WriteLine(b));

foreach(var array in rawMessages)
{
    arrayStream.OnNext(array);
}    

Output:

This
is
a
test!

Notes Caveats

The Rx maybe idiomatic, but there's lots to think about here. I've ignored performance here in stripping down the byte arrays and building them up again (but then network is miles slower than compute so that may not be a concern). I've assumed error handling is all dealt with upstream in producing the IObservable<byte[]>, and I've not dealt with decoding errors - there's no attention to timeouts, network issues etc. I've assumed the last message has the linefeed postfixed.

If you want multiple observers to subscribe to this stream, be sure to multicast it via Publish if you need to avoid multiple subscriptions to the underlying IObservable<byte[]>.

Upvotes: 3

Enigmativity
Enigmativity

Reputation: 117027

This seemed like an easy approach to me:

private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes)
{
    return Observable.Create<string>(o =>
    {
        var subject = new Subject<string>();
        return new CompositeDisposable(
            subject.Subscribe(o),
            bytes
                .Aggregate(
                    "",
                    (s, bs) =>
                    {
                        var parts = (s + Encoding.ASCII.GetString(bs))
                            .Split(new [] { (char)10 });
                        foreach (var part in parts.Take(parts.Length - 1))
                        {
                            subject.OnNext(part);
                        }
                        return parts.Last();
                    })
                .Subscribe(subject));
    });
}

I tested it with some dummy data and it worked fine for me.

One of the key things you should do defining this kind of operator is to wrap it in a Observable.Create(...) call to make sure that the returned observable can be subscribed to by any number of observers.

Upvotes: 1

Daniel C. Weber
Daniel C. Weber

Reputation: 1011

You are right in looking for an alternative for using Subject; though there definitely are scenarios where they are extremely useful, this can be done without. But first, observe that the anonymous method passed to Subscribe is closing over the local variables 'subject' and 'leftovers', which can and will lead to bad results when you subscribe twice on the resulting IObservable<string> (e.g. both subscriptions would subscribe to the same subject, which is probably not what you want). Whenever I need to carry some state around multiple observations, I use the Scanmethod. It's first argument takes a initial state (which should be immutable since it is also shared among subscriptions), the second argument is a function that is fed your current value and returns the new state, which is then passed in on the next value and so on. You end up with an observable of these states, which you then project (by SeletMany). An implementation could look like this:

private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes)
{
    const byte byteLineFeed = 10;

    return bytes.Scan(
        new
        {
            Leftovers = (byte[])null,
            Lines = new List<string>();
        },
        (tuple, current) =>
        {
            var lastPositionOfLineFeed = -1;
            var newLeftovers = tuple.Leftovers;
            var lines = new List<string>();

            for (var i = 0; i < current.Length; i++)
            {
                if (current[i] == byteLineFeed)
                {
                    if (tuple.Leftovers != null)
                    {
                        lines.Add(Encoding.ASCII.GetString(
                                    tuple.Leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                                        i - lastPositionOfLineFeed))
                                                .ToArray()));

                        newLeftovers = null;
                    }
                    else
                    {
                        lines.Add(Encoding.ASCII.GetString(
                                current.Slice(lastPositionOfLineFeed + 1,
                                    i - lastPositionOfLineFeed)));
                    }
                    lastPositionOfLineFeed = i;
                }
            }

            if (lastPositionOfLineFeed != current.Length - 1)
            {
                if (tuple.Leftovers != null)
                {
                    newLeftovers = tuple.Leftovers.Union(current.Slice(lastPositionOfLineFeed + 1,
                            current.Length - lastPositionOfLineFeed - 1))
                                                .ToArray();
                }
                else
                {
                    newLeftovers = current.Slice(lastPositionOfLineFeed + 1,
                        current.Length - lastPositionOfLineFeed - 1);
                }
            }

            return new
            {
                Leftovers = newLeftovers,
                Lines = lines,
            };
        })
        .SelectMany(tuple => tuple.Lines);
}

I have not tested this!

Upvotes: 1

Foole
Foole

Reputation: 4850

I would use SelectMany with a selector which returns an IEnumerable<string>.

eg:

    public static IObservable<string> GetCompleteMessage(this IObservable<byte[]> source)
    {
        const byte byteLineFeed = 10;
        IEnumerable<byte> remanider = Enumerable.Empty<byte>();

        Func<byte[], IEnumerable<string>> selector = data =>
        {
            var result = new List<string>();
            var current = new ArraySegment<byte>(data);

            while (true)
            {
                var dividerOffset = ((IList<byte>)current).IndexOf(byteLineFeed);

                if (dividerOffset == -1) // No newline found
                {
                    remanider = remanider.Concat(current);
                    break;
                }

                var segment = new ArraySegment<byte>(current.Array, current.Offset, dividerOffset);
                var lineBytes = remanider.Concat(segment).ToArray();
                result.Add(Encoding.ASCII.GetString(lineBytes));

                remanider = Enumerable.Empty<byte>();
                current = new ArraySegment<byte>(current.Array, current.Offset + dividerOffset + 1, current.Count - 1 - dividerOffset);
            }

            return result;
        };

        return source.SelectMany(selector);
    }

Alternatively, you could use a NetworkStream and StreamReader to achieve the same result:

    public static IObservable<string> ReadLineObservable(this TextReader reader)
    {
        return Observable.FromAsync(() => reader.ReadLineAsync())
            .Repeat()
            .TakeWhile(x => x != null);
    }

Upvotes: 4

Related Questions