Reputation: 18939
I'm building an observable from an event, which outputs text lines that confirm records which are divided by two empty lines. For example:
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
I'd like the observable to split the output by two new lines, so the subscriber gets the data in chunks.
How can I accomplish this using RxJS? It doesn't seem to have a function to perform this job.
I could subscribe to the observer, accumulate values and re-emit myself, but I believe there is a more elegant solution that I'm not seeing.
Upvotes: 2
Views: 360
Reputation: 18665
A combination of buffer
and sample
and scan
could also work. Basically you accumulate inputs (i.e. lines) in a buffer. You release this buffer every time an observable emits a value, with sample
. Then you make it so that observable emits a value every time it detects two consecutives \n
. This can be achieved with scan
. Note that this requires your source$
to be a hot observable.
So, you could complete the following code and keep us updated if it worked in the end :
var detect_two_lines = function (acc, new_line)){
// if new_line and last line of acc are both \n
// then acc.arr_lines = [], acc.found = true
// else acc.arr_lines.push(new_line), acc.found = false
}
var identity = function(x){return x}
var sample$ = source$.scan(detect_two_lines, {arr_lines:[], found:false})
.pluck('found')
.filter(identity);
var results$ = source$.buffer(sample$);
Upvotes: 3