Reputation: 51
Rx is great, but sometime it's hard to find the elegant way to do something. The idea is quite simple. I receive events with a byte[], this array might contains part of a line, multiple lines or one line. What I want is find a way to have an IObservable of Line so IObservable<String>
, where each element of the sequence will be a line.
After hours the closest solution I found is pretty ugly, and off course doesn't work because scan trigger OnNext on every char :
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
Note : _reactiveSubcription
should be the IObservable<String>
.
Without considering character encoding issues, what am I missing to make this work ?
Upvotes: 3
Views: 661
Reputation: 1294
This works for me.
First, Convert the byte[] into a string and Split the string on \r
(Regex Split keeps the delimiters).
Now there is a stream of Strings some of which end in \r
.
Then Concat, to keep them in order. Also, since strings
needs to be Hot for the next step, Publish them.
var strings = bytes.
Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
Where(s=> s.Length != 0).
ToObservable()).
Concat().
Publish().
RefCount();
Make a Window of strings that ends when a string ends with \r
. strings
needs to be hot since it is used for both the Window contents and the end-of-window trigger.
var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));
Aggregate each Window into a single string.
var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));
lines
is IObservable<String>
and each string contains one line.
To test this I used the following generator to produce IObservable<byte[]>
var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
Return((byte)('A' + i)).
Repeat(24).
Concat(Observable.
Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());
Upvotes: 7