Reputation: 5216
I'm trying to parse an incoming stream of bytes that represents messages. I need to split the stream and create a message structure for each part.
A message always starts with a 0x81 (BOM) and ends with a 0x82 (EOM).
start: 0x81
header: 3 bytes
data: arbitrary length
stop: 0x82
The data part is escaped using an escape byte 0x1B (ESC): Anytime a byte in the data part contains one of the control bytes {ESC, BOM, EOM}, it is prefixed with ESC.
The header part is not escaped, and may contain control bytes.
I would like to code this in a functional reactive style using Rx.Net, by consuming an IObservable<byte>
and transforming it into an IObservable<Message>
What is the most idiomatic way to do this?
Some examples:
[81 01 02 03 82] single message
[81 82 81 82 82] single message, header = [82 81 82]
[81 01 02 1B 82] single message, header = [01 02 1B].
[81 01 02 03 1B 82 82] single message, header = [01 02 03], (unescaped) data = [82]
[81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored.
header = [01 02 03], (unescaped) data = [1B]
Here's a state machine drawing for this:
Upvotes: 8
Views: 515
Reputation: 10783
If you are looking for something the is "more functional" then this may help, however the answer by @Evk pass these tests too.
Firstly can I suggest, to take the leg work out of providing a verifiable answer, could you provide a test suite to implement on complex questions like this.
Something like this would have been very helpful.
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<byte>(
ReactiveTest.OnNext<byte>(01,0x81), //BOM m1
ReactiveTest.OnNext<byte>(05,0x82), //EOM m1
ReactiveTest.OnNext<byte>(06,0x81), //BOM m2
ReactiveTest.OnNext<byte>(10,0x82), //EOM m2
ReactiveTest.OnNext<byte>(11,0x81), //BOM m3
ReactiveTest.OnNext<byte>(15,0x82), //EOM m3
ReactiveTest.OnNext<byte>(16,0x81), //BOM m4
ReactiveTest.OnNext<byte>(20,0x1B), //Control character
ReactiveTest.OnNext<byte>(21,0x82), //Data
ReactiveTest.OnNext<byte>(22,0x82), //EOM m4
ReactiveTest.OnNext<byte>(23,0x81), //BOM m5
ReactiveTest.OnNext<byte>(27,0x1B), //Control character
ReactiveTest.OnNext<byte>(28,0x1B), //Data
ReactiveTest.OnNext<byte>(29,0x82), //EOM m5
ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)
var observer = scheduler.CreateObserver<Message>();
new[] {
ReactiveTest.OnNext(05, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[0]{}}),
ReactiveTest.OnNext(10, new Message(){Header=new byte[]{0x82, 0x81, 0x82}, Data=new byte[0]{}}),
ReactiveTest.OnNext(15, new Message(){Header=new byte[]{0x01, 0x02, 0x1B}, Data=new byte[0]{}}),
ReactiveTest.OnNext(22, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x82}}),
ReactiveTest.OnNext(29, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x1B}}),
I have also written a version of Message
that allows me to verify the code
public class Message
public static readonly byte BOM = 0x81;
public static readonly byte EOM = 0x82;
public static readonly byte Control = 0x1B;
public byte[] Header { get; set; }
public byte[] Data { get; set; }
public static Message Create(byte[] bytes)
throw new ArgumentNullException(nameof(bytes));
throw new ArgumentException("bytes<3").Dump();
var header = new byte[3];
Array.Copy(bytes, header, 3);
var body = new List<byte>();
var escapeNext = false;
for (int i = 3; i < bytes.Length; i++)
var b = bytes[i];
if (b == Control && !escapeNext)
escapeNext = true;
escapeNext = false;
var msg = new Message { Header = header, Data = body.ToArray()};
return msg;
public override string ToString()
return string.Format("Message(Header=[{0}], Data=[{1}])", ByteArrayString(Header), ByteArrayString(Data));
private static string ByteArrayString(byte[] bytes)
return string.Join(",", bytes.Select(b => b.ToString("X")));
public override bool Equals(object obj)
var other = obj as Message;
return false;
return Equals(other);
protected bool Equals(Message other)
return IsSequenceEqual(Header, other.Header)
&& IsSequenceEqual(Data, other.Data);
private bool IsSequenceEqual<T>(IEnumerable<T> expected, IEnumerable<T> other)
if(expected==null && other==null)
return true;
if(expected==null || other==null)
return false;
return expected.SequenceEqual(other);
public override int GetHashCode()
return ((Header != null ? Header.GetHashCode() : 0) * 397) ^ (Data != null ? Data.GetHashCode() : 0);
Now that I have all the plumbing, I can focus on the actual problem.
public static IObservable<Message> MyAnswer(IObservable<byte> source)
return source.Publish(s =>
//Start consuming once we see a BOM
s.SkipWhile(b => b != Message.BOM)
.Scan(new Accumulator(), (acc, cur)=>acc.Accumulate(cur))
.Select(buffer => Message.Create(buffer))
public class Accumulator
private int _index = 0;
private byte _current =0;
private bool _isCurrentEscaped = false;
private bool _isNextEscaped = false;
public Accumulator Accumulate(byte b)
_current = b;
_isCurrentEscaped = _isNextEscaped;
_isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
return this;
public byte Value()
return _current;
private bool IsHeader()
return _index < 5;
public bool IsBeginingOfMessage()
return _index == 1 && _current == Message.BOM;
public bool IsEndOfMessage()
return !IsHeader()
&& _current == Message.EOM
&& !_isCurrentEscaped;
For completeness, here is the guts of @Evk's answer so you easily swap in and out implementations.
public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
return Observable.Create<Message>(o =>
// some crude parsing code for the sake of example
bool nextIsEscaped = false;
bool readingHeader = false;
bool readingBody = false;
List<byte> body = new List<byte>();
List<byte> header = new List<byte>();
return source.Subscribe(b =>
if (b == 0x81 && !nextIsEscaped && !readingHeader)
// start
readingHeader = true;
readingBody = false;
nextIsEscaped = false;
else if (b == 0x82 && !nextIsEscaped && !readingHeader)
// end
readingHeader = false;
readingBody = false;
if (header.Count > 0 || body.Count > 0)
o.OnNext(new Message()
Header = header.ToArray(),
Data = body.ToArray()
nextIsEscaped = false;
else if (b == 0x1B && !nextIsEscaped && !readingHeader)
nextIsEscaped = true;
if (readingHeader)
if (header.Count == 3)
readingHeader = false;
readingBody = true;
else if (readingBody)
nextIsEscaped = false;
Upvotes: 3
Reputation: 101453
You can just use basic building blocks: Observable.Create
and Subscribe
. First let's grab some code which will help us to read stream as observable of byte[] (there are many different examples of how to do that):
static class Extensions {
public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize) {
var buffer = new byte[bufferSize];
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
return Observable.While(
() => stream.CanRead,
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
.Select(readBytes => buffer.Take(readBytes).ToArray()));
Then define message class:
class Message {
public byte[] Header { get; set; }
public byte[] Body { get; set; }
And then put that into small console app:
static void Main(string[] args) {
// original stream
var stream = new MemoryStream(new byte[] { 0x81, 0x01,0x02,0x03,0x1B,0x1B,0x82,0x82});
// your initial IObservable<byte[]>
IObservable<byte[]> bytes = stream.AsyncRead(128); // or any other buffer size
// your IObservable<Message>
var observable = Observable.Create<Message>(observer => {
// some crude parsing code for the sake of example
bool nextIsEscaped = false;
bool readingHeader = false;
bool readingBody = false;
List<byte> body = new List<byte>();
List<byte> header = new List<byte>();
return bytes.Subscribe(buffer => {
foreach (var b in buffer) {
if (b == 0x81 && !nextIsEscaped && !readingHeader) {
// start
readingHeader = true;
readingBody = false;
nextIsEscaped = false;
else if (b == 0x82 && !nextIsEscaped && !readingHeader) {
// end
readingHeader = false;
readingBody = false;
if (header.Count > 0 || body.Count > 0) {
observer.OnNext(new Message() {
Header = header.ToArray(),
Body = body.ToArray()
nextIsEscaped = false;
else if (b == 0x1B && !nextIsEscaped && !readingHeader) {
nextIsEscaped = true;
else {
if (readingHeader) {
if (header.Count == 3) {
readingHeader = false;
readingBody = true;
else if (readingBody)
nextIsEscaped = false;
observable.Subscribe(msg =>
Console.WriteLine("Header: " + BitConverter.ToString(msg.Header));
Console.WriteLine("Body: " + BitConverter.ToString(msg.Body));
Upvotes: 2