Reputation: 1109
I have a small question in regards to RX. I have a stream of symbols coming in from keyboard, and i need to partition them into groups. A new group should be started when a ';' symbols comes from the stream. In simpler terms, I need an operator kinda like Buffer, but that fires when a certain condition is true, rather than after some time delay or event count. Is there a way to build it with operators already present in RX, or should I enroll my own?
Upvotes: 3
Views: 698
Reputation: 18125
Here's a non-RefCount version of Nikolai's answer. This provides more explicit synchronization of subscription and disposition, and should remove a race condition which occurs when your source is observed on a different thread than which your consumer is subscribed on (which is often times the case when you're dealing with UIs).
var groups = Observable.Create(o => {
var publishedSource = source.Publish();
return new CompositeDisposable(
publishedSource.Buffer(publishedSource.Where(c => c == ';')).Subscribe(o),
publishedSource.Connect()
);
});
Upvotes: 2
Reputation: 18125
Here's a source.
var source = new[] { 'a', 'b', ';', 'c', 'd', 'e', ';' }.ToObservable();
Here's what you're asking for:
var groups = source
// Group the elements by some constant (0)
// and end the group when we see a semicolon
.GroupByUntil(x => 0, group => group.Where(x => x == ';'))
Here's a way to use it:
groups
// Log that we're on the next group now.
.Do(x => Console.WriteLine("Group: "))
// Merge / Concat all the groups together
// {{a..b..;}..{c..d..e..;}} => {a..b..;..c..d..e..;}
.Merge()
// Ignore the semicolons? This is optional, I suppose.
.Where(x => x != ';')
// Log the characters!
.Do(x => Console.WriteLine(" {0}", x))
// Make it so, Number One!
.Subscribe();
Output:
Group:
a
b
Group:
c
d
e
Upvotes: 3
Reputation: 29
We can use the Buffer override with boundary observable where the boundary observable is our initial stream filtered to only semicolon entries.
//this is our incoming stream
IObservable<char> keyboardStream;
//if the observable is cold we need to do this
//in case of it being hot (as I would expect a keyboard stream to be) we do not need to do it
var persistedStream = keyboardStream.Publish().RefCount();
var bufferedBySemicolon = persistedStream.Buffer(persistedStream .Where(c=>c==';'));
Upvotes: 2