Reputation: 17261
I have a stream of strings and I want to emit each line. Basically from this:
let stream$ = from(['hello\n', 'world ', ' at\nhome\n and', ' more'])
I want to emit this stream:
'hello'
'world at'
'home'
' and more'
My guess is that I need the merge
operator after making sure there are no stream values with more than one line break. Something like:
let break$ = new Subject()
stream$.pipe(
flatMap(x => x.match(/[^\n]+\n?|\//g)),
map(x => {
if (x.endsWith('\n')) {
break$.next(true)
return x
}
return x
})
.buffer(break$)
)
However this pipe emits a single value, but I am sure the break$
really gets called when the stream of values should form a group. Currently this is the output:
[ 'hello\n', 'world ', ' at\n', 'home\n', ' and', ' more' ]
but my expectation was
[
['hello\n'],
['world ', ' at\n'],
['home\n'],
[' and', ' more'],
]
I do have a working solution, but that triggers a subscription and I would rather have a pipe for the lazy evaluation
Upvotes: 1
Views: 138
Reputation: 17762
If I understand the problem right, this could be an approach centered on the use of scanMap
operator.
scanMap
operator is an operator that holds a state (like reduce
) but notifies something for each notification that comes from upstream. We need to keep a state since, for instance, if we receive from upstream a string which does not contain a \n
, such string has to be stored in the state to be reused in the next notification (basically it has to be attached in front of the next string notified).
This is the code implementing this approach, with comments
import './style.css';
import { from, merge } from 'rxjs'; import { map, mergeMap, scan, share, last } from 'rxjs/operators';
let stream$ = from([
'hello\n',
'world ',
' at\nhome\n and',
' more',
'\n and even more',
]);
let out$ = stream$.pipe(
// scan keeps the state between subsequent notifications coming from upstream
scan(
// state is the value holding the state, val is the value of the notification
// coming from upstream
(state, val) => {
// if the string received from upfront includes \n, then we need to split it
if (val.includes('\n')) {
// we take whatever tail can come from the previous notification (the tail
// is the part of the string that comes after the last \n)
const prevTail = state.tail;
// reset the tail
state.tail = '';
// concatenate the tail with the string received, then split
const lines = (prevTail + val)
.split('\n')
.filter((ls) => ls.length > 0);
// store in the state the lines found with the split
state.lines = lines;
// if the last part of the split does not end with \n, it becomes the next tail
if (val[val.length - 1] !== '\n') {
state.tail = lines[lines.length - 1];
state.lines = lines.slice(0, lines.length - 1);
}
// notifies downstream the state
return state;
}
// if there is no \n in the line, reset the lines in the state and set the tail
state.lines = [];
state.tail = state.tail + val;
return state;
},
// initialize the state
{ tail: '', lines: [] }
),
// share is necessary since this stream will be used to compose 2 other streams
share()
);
// this is the stream that notifies all the lines apart the last one
const outFirst$ = out$.pipe(
map((obj) => obj.lines),
// here mergeMap (aka flatMap) is used to flatten an array of strings into a stream
// of streams
mergeMap((ls) => ls)
);
// this is the stream that notifies the last string
const outLast$ = out$.pipe(
last(),
map((obj) => obj.tail)
);
// merge the 2 streams and then subscribe
merge(outFirst$, outLast$).subscribe({
next: (x) => {
console.log(x);
},
});
This is a stackblitz implementing the solution.
First we start using the
Upvotes: 2