SystematicFrank
SystematicFrank

Reputation: 17261

Buffer stream values according condition

I have a stream of strings and I want to emit each line. Basically from this:

let stream$ = from(['hello\n', 'world ', ' at\nhome\n and', ' more'])

I want to emit this stream:

 'hello' 
 'world  at'
 'home'
 ' and more'

My guess is that I need the merge operator after making sure there are no stream values with more than one line break. Something like:

let break$ = new Subject()
stream$.pipe(
    flatMap(x => x.match(/[^\n]+\n?|\//g)),
    map(x => {
      if (x.endsWith('\n')) {
        break$.next(true)
        return x
      }
      return x
    })
    .buffer(break$)
)

However this pipe emits a single value, but I am sure the break$ really gets called when the stream of values should form a group. Currently this is the output:

  [ 'hello\n', 'world ', ' at\n', 'home\n', ' and', ' more' ]

but my expectation was

[
  ['hello\n'],
  ['world ', ' at\n'],
  ['home\n'],
  [' and', ' more'],
]

I do have a working solution, but that triggers a subscription and I would rather have a pipe for the lazy evaluation

Upvotes: 1

Views: 138

Answers (1)

Picci
Picci

Reputation: 17762

If I understand the problem right, this could be an approach centered on the use of scanMap operator.

scanMap operator is an operator that holds a state (like reduce) but notifies something for each notification that comes from upstream. We need to keep a state since, for instance, if we receive from upstream a string which does not contain a \n, such string has to be stored in the state to be reused in the next notification (basically it has to be attached in front of the next string notified).

This is the code implementing this approach, with comments

import './style.css';

import { from, merge } from 'rxjs'; import { map, mergeMap, scan, share, last } from 'rxjs/operators';

let stream$ = from([
  'hello\n',
  'world ',
  ' at\nhome\n and',
  ' more',
  '\n and even more',
]);

let out$ = stream$.pipe(
  // scan keeps the state between subsequent notifications coming from upstream
  scan(
    // state is the value holding the state, val is the value of the notification
    // coming from upstream
    (state, val) => {
      // if the string received from upfront includes \n, then we need to split it
      if (val.includes('\n')) {
        // we take whatever tail can come from the previous notification (the tail
        // is the part of the string that comes after the last \n)
        const prevTail = state.tail;
        // reset the tail
        state.tail = '';
        // concatenate the tail with the string received, then split
        const lines = (prevTail + val)
          .split('\n')
          .filter((ls) => ls.length > 0);
        // store in the state the lines found with the split
        state.lines = lines;
        // if the last part of the split does not end with \n, it becomes the next tail
        if (val[val.length - 1] !== '\n') {
          state.tail = lines[lines.length - 1];
          state.lines = lines.slice(0, lines.length - 1);
        }
        // notifies downstream the state
        return state;
      }
      // if there is no \n in the line, reset the lines in the state and set the tail
      state.lines = [];
      state.tail = state.tail + val;
      return state;
    },
    // initialize the state
    { tail: '', lines: [] }
  ),
  // share is necessary since this stream will be used to compose 2 other streams
  share()
);

// this is the stream that notifies all the lines apart the last one
const outFirst$ = out$.pipe(
  map((obj) => obj.lines),
  // here mergeMap (aka flatMap) is used to flatten an array of strings into a stream
  // of streams
  mergeMap((ls) => ls)
);

// this is the stream that notifies the last string
const outLast$ = out$.pipe(
  last(),
  map((obj) => obj.tail)
);

// merge the 2 streams and then subscribe
merge(outFirst$, outLast$).subscribe({
  next: (x) => {
    console.log(x);
  },
});

This is a stackblitz implementing the solution.

First we start using the

Upvotes: 2

Related Questions