Laurent
Laurent

Reputation: 51

Split IObservable<byte[]> to characters then to line

Rx is great, but sometime it's hard to find the elegant way to do something. The idea is quite simple. I receive events with a byte[], this array might contains part of a line, multiple lines or one line. What I want is find a way to have an IObservable of Line so IObservable<String>, where each element of the sequence will be a line.

After hours the closest solution I found is pretty ugly, and off course doesn't work because scan trigger OnNext on every char :

//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
    //Scan doesn't work it trigger OnNext on every char
    //Aggregate doesn't work neither as it doesn't return intermediate result
    .Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
    .Subscribe(this);


Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
            //Data is a byte[]
            .Select(_ => _.EventArgs.Data)
            .Subscribe(array => array.ToObservable()
            //Convert into char
            .ForEach(c => outputStream.OnNext((char)c)));

Note : _reactiveSubcription should be the IObservable<String>.

Without considering character encoding issues, what am I missing to make this work ?

Upvotes: 3

Views: 661

Answers (1)

Kirk Shoop
Kirk Shoop

Reputation: 1294

This works for me.

First, Convert the byte[] into a string and Split the string on \r (Regex Split keeps the delimiters).

Now there is a stream of Strings some of which end in \r.

Then Concat, to keep them in order. Also, since strings needs to be Hot for the next step, Publish them.

var strings = bytes.
  Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
    Where(s=> s.Length != 0).
    ToObservable()).
  Concat().
  Publish().
  RefCount();

Make a Window of strings that ends when a string ends with \r. strings needs to be hot since it is used for both the Window contents and the end-of-window trigger.

var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));

Aggregate each Window into a single string.

var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));

lines is IObservable<String> and each string contains one line.

To test this I used the following generator to produce IObservable<byte[]>

var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
    Return((byte)('A' + i)).
    Repeat(24).
    Concat(Observable.
        Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());

Upvotes: 7

Related Questions