Evgeni Petrov
Evgeni Petrov

Reputation: 1383

How to filter one observable with another observable?

With https://github.com/neuecc/UniRx,

I have two observables A and B.

I want A to be filtered by B. Sample seems like what I want but the negative of it.

IObservable<long> A = Observable.EveryUpdate();            
IObservable<Collider2D> B = this.OnTriggerEnter2DAsObservable()
                .Where( x => x.gameObject.tag == "Wall");

I want some kind of Pseudo code like that:

A.filterBy(B)
 .Subscribe(x => Debug.Log(x)); //executed only when B is not streaming

(Update1) Here is actual code. I am trying to cancel out input stream with colliding stream.

    var isCollidingWithWall = this.OnTriggerEnter2DAsObservable()
        .Where(collider => collider.gameObject.tag == "Wall");

    Func<long, float> displaceCalculate = (_) => this.Speed * Time.deltaTime;

    var moveLeft = Observable.EveryUpdate()
        .Where(_ => Input.GetKey(KeyCode.A) || Input.GetKey(KeyCode.LeftArrow));
    var moveRight = Observable.EveryUpdate()
        .Where(_ => Input.GetKey(KeyCode.D) || Input.GetKey(KeyCode.RightArrow));

    var movement1 = moveLeft
        .Select(displaceCalculate)
        .Select(f => -f);

    var movement2 = moveRight
        .Select(displaceCalculate);

    movement2
        .Merge(movement1)
        .Subscribe(f =>
        {
            this.transform.position = new Vector2(this.transform.position.x + f, this.transform.position.y);
        });

I think I might be going in wrong direction.

Upvotes: 1

Views: 1160

Answers (3)

torisoup
torisoup

Reputation: 21

I just came up with another way.

    var streamB = this.OnTriggerEnter2DAsObservable().AsUnitObservable();

    this.UpdateAsObservable()
        .TakeUntil(streamB)
        .RepeatUntilDestroy(this)
        .Subscribe(_ =>
        {
            Debug.Log(Time.frameCount);
        });

Upvotes: 1

Urs Hanselmann
Urs Hanselmann

Reputation: 9

Can you provide a little more context about the actual game behavior you are trying to implement?

My guess would be that there is some other approach to what you are trying to do, without having to rely on EveryUpdate (e.g. by using OnTriggerStay and/or OnTriggerExit).

Just giving a guess to what you mean by "negative" of the sample operator: you might want to have a look at pausable. You'd have to generate the proper boolean values though, and how to do that really depends on what game behavior you are actually trying to implement here.

Upvotes: 0

torisoup
torisoup

Reputation: 21

It is difficult to only combine operators.

The two streams are not synchronized. When the OnNext message comes from stream B, how long shut off stream A? Next stream B message? or Next stream A?

If you want to stop it only one frame, how about this?

void Start()
{
    var isCollisionEntered = false;

    this.OnCollisionEnter2DAsObservable()
        .Where(x => x.gameObject.tag == "Wall")
        .Subscribe(_ => isCollisionEntered = true);

    this.LateUpdateAsObservable()
        .Where(_ => isCollisionEntered)
        .Subscribe(_ => isCollisionEntered = false);

    this.UpdateAsObservable()
        .Where(_ => !isCollisionEntered)
        .Subscribe(_ => Debug.Log("Do here"));
}

And, I don't recommend Observable.EveryUpdate .It is necessary to manage lifetime.

I recommend using this.UpdateAsObservable (UniRx.Triggers) instead. It automatically publishes OnCompleted message on the gameobject destroyed.

Upvotes: 1

Related Questions