Mehdi Saffar
Mehdi Saffar

Reputation: 770

How to branch off of an rxjs stream conditionally?

I am trying to simulate the "brush" feature like the one in any image editor.

I have the following streams:

What I want to do

When the user is dragging the mouse, do temporary calculations. If the mouse is up, then commit those changes. If the escape key is pressed then do not commit those changes.

What I am currently handling is the first case:

$pointerDown.pipe(
   r.switchMap(() =>
       $position.pipe(
           r.throttleTime(150),
           r.map(getNodesUnderBrush),
           r.tap(prepareChanges),
           r.takeUntil($pointerUp),
           r.finalize(commitBrushStroke))
)).subscribe()

How can I end the stream in two different ways? What is the idiomatic rxjs for this?

Thanks

Upvotes: 5

Views: 2126

Answers (5)

Kari F.
Kari F.

Reputation: 1434

Here goes an other possible solution very much like your original:

    const start$ = fromEvent(document, "mousedown").pipe(
      tap((event: MouseEvent) => this.start = `x: ${event.clientX}, y: ${event.clientY}`)
    );
    const drag$ = fromEvent(document, "mousemove").pipe(
      tap((event: MouseEvent) => (this.move = `x: ${event.clientX}, y: ${event.clientY}`)      )
    );
    const stop$ = fromEvent(document, "mouseup").pipe(
      tap((event: MouseEvent) => (this.stop = `x: ${event.clientX}, y: ${event.clientY}`))
    );
    const cancel$ = fromEvent(document, "keydown").pipe(
      filter((e: KeyboardEvent) => e.code === "Escape"),
      tap(() => this.stop = 'CANCELLED')
    );

    const source$ = start$
      .pipe(
        mergeMap(() =>
          drag$.pipe(
            takeUntil(merge(stop$, cancel$))
          )
        )
      )
      .subscribe();

The stream can end in two ways merging both takeUntil conditions:

takeUntil(merge(stop$, cancel$))

Here is the Stackblitz: https://stackblitz.com/edit/angular-gw7gyr

Upvotes: 0

Picci
Picci

Reputation: 17762

If I understand right, when the user presses the escape key than the entire stream should be unsubscribed so that, when the users starts again the dragging with the pointer mouse down, the stream starts again emitting.

If this is the case, you may want to try switchMap with $escape and merge, in other words something like this

const drag$ = $pointerDown.pipe(
   r.switchMap(() =>
       $position.pipe(
           r.throttleTime(150),
           r.map(getNodesUnderBrush),
           r.tap(prepareChanges),
           r.takeUntil($pointerUp),
           r.finalize(commitBrushStroke))
))
const brush$ = $escape.pipe(
   startWith({}),  // emit any value at start just to allow the stream to start
   switchMap(() => drag$)
);
const stopBrush$ = $escape.pipe(tap(() => // do stuff to cancel));
merge(brush$, stopBrush$).subscribe();

The whole idea is that, any time $escape emits, the previous subscription to drag$ is unsubscribed and a new one starts. At the same time any logic to cancel what needs to be cancelled can be performed.

I can not test this thing, so I hope I have not forgot something.

Upvotes: 0

martin
martin

Reputation: 96891

I think there's pretty simple way to do that with toArray() and takeUntil(). I'm assuming that when you said you want "commit" changes you want to collect all changes and process them all at once. Otherwise, the same approach would work with buffer() as well.

$pointerDown.pipe(
   switchMap(() => $position.pipe(
     throttleTime(150),
     map(getNodesUnderBrush),
     tap(prepareChanges), // ?
     takeUntil($pointerUp),
     toArray(),
     takeUntil($escape),
  )
).subscribe(changes => {
  ...
  commitBrushStroke(changes);
})

So the entire trick is whether you complete the inner chain before or after toArray. When you complete it before toArray() then toArray() will emits a single array of all changes it has collected so far. If you complete it after toArray() then the chain is disposed and toArray() will discard everything and just unsubscribe.

Upvotes: 0

Andrei Gătej
Andrei Gătej

Reputation: 11934

Very interesting problem!

Here's my approach:

const down$ = fromEvent(div, 'mousedown');
const up$ = fromEvent(div, 'mouseup');
const calc$ = of('calculations');
const esc$ = fromEvent(document, 'keyup')
  .pipe(
    filter((e: KeyboardEvent) => e.code === 'Escape')
  );

down$ // The source of the stream - mousedown events
  .pipe(
    switchMapTo(calc$.pipe( // Do some calculations
      switchMapTo(
        merge( // `merge()` - want to stop making the calculations when either `up$` or `esc$` emit
          up$.pipe(mapTo({ shouldCommit: true })),
          esc$.pipe(mapTo({ shouldCommit: false })),
        ).pipe(first()) // `first()` - either of these 2 can stop the calculations;
      )
    ))
  )
  .subscribe(console.log)

StackBlitz.

Upvotes: 0

Jonathan Stellwag
Jonathan Stellwag

Reputation: 4267

Regarding your question I can see you need to have some kind of state over time. Here your state is the pointerdown/move/dragging observable, that needs to be accumulated or cleared and finally emitted. When I see such a state scenario I always like to use the scan operator:

Pre

For the sake of simple example i did not use your predefined observables. If you have issues adapting your specific pointer usecase to this very similar one, I can try to update it so it is closer to your question

1. What could represent my state

Here I am using an enum [status] to later on react on the event that happened before and an accumulation [acc] for the points over time

interface State {
  acc: number[],
  status: Status
}

enum Status {
  init,
  move,
  finish,
  escape
}

const DEFAULT_STATE: State = {
  acc: [],
  status: Status.init
}

2. Write functions that mutate the state

Your requirement can be split into: accumulate [pointerdown$ + position$], finish [pointerup$], escape [escape$]

const accumulate = (index: number) => (state: State): State =>
({status: Status.move, acc: [...state.acc, index]});

const finish = () => (state: State): State =>
({status: Status.finish, acc: state.acc})

const escape = () => (state: State): State =>
({status: Status.escape, acc: []})

3. Map your functions to your observables

merge(
  move$.pipe(map(accumulate)),
  finish$.pipe(map(finish)),
  escape$.pipe(map(escape))
)

4. Use the functions in the scan where your state over time is placed

scan((acc: State, fn: (state: State) => State) => fn(acc), DEFAULT_STATE)

5. Process your mutated state

Here we only want to process if we have a finish, so we filter for it

filter(state => state.status === Status.finish),

Inner state sample

  1. move$.next('1') = State: {status: move, acc: ['1']}
  2. escape$.next() = State: {status: escape, acc: []}
  3. move$.next('2') = State: {status: move, acc: ['2']}
  4. finish$.next() = State: {status: finish, acc: ['2']}

Running stackblitz

FYI: It is pretty hard to get this mindset of solving state problems with rxjs. But once you understand the flow behind the idea, you can use it in nearly every statefull scenario. You will avoid sideeffects, stick to rxjs workflow and you can easily optimize/debugg your code.

Upvotes: 1

Related Questions