tower120
tower120

Reputation: 5265

merge performance in Functional Reactive programming (RX)

In the following code:

http://jsfiddle.net/staltz/4gGgs/27/

var clickStream = Rx.Observable.fromEvent(button, 'click');

var multiClickStream = clickStream
    .buffer(function() { return clickStream.throttle(250); })
    .map(function(list) { return list.length; })
    .filter(function(x) { return x > 1; });

// Same as above, but detects single clicks
var singleClickStream = clickStream
    .buffer(function() { return clickStream.throttle(250); })
    .map(function(list) { return list.length; })
    .filter(function(x) { return x === 1; });

// Listen to both streams and render the text label accordingly
singleClickStream.subscribe(function (event) {
    document.querySelector('h2').textContent = 'click';
});
multiClickStream.subscribe(function (numclicks) {
    document.querySelector('h2').textContent = ''+numclicks+'x click';
});
Rx.Observable.merge(singleClickStream, multiClickStream)
    .throttle(1000)
    .subscribe(function (suggestion) {
        document.querySelector('h2').textContent = '';
    });

How many times clickStream sequence will be iterated after merge? I mean, will it look like this:

case 1

     for(numclicks : clickStream.length){
        if (numclicks === 1){ 
            document.querySelector('h2').textContent = 'click';
        }
     };
     for(numclicks : clickStream.length){
        if (numclicks > 1){ 
            document.querySelector('h2').textContent = ''+numclicks+'x click';
        }
     };

Or it will be internally, really merged to something like this (pseudocode):

case 2

    for(numclicks: clickStream.length){
        if (numclicks === 1){ 
            document.querySelector('h2').textContent = 'click';
        }else if(numclicks > 1){
            document.querySelector('h2').textContent = ''+numclicks+'x click';
        }
     }

I personally think, that merge just sequentially apply stream to its arguments (case 1).

P.S. I hope there is some standart for things like this. But if no - I particularly interested in RxCpp and Sodium implementation. I took js example, as more interactive.

Upvotes: 0

Views: 256

Answers (1)

Kirk Shoop
Kirk Shoop

Reputation: 1294

fromEvent returns a hot source and so all subscribers share the same iteration of the for loop.

Ignoring the throttle calls, the result is similar to:

for(numclicks: clickStream.length){

    // first subscription
    if (numclicks === 1){ 
        document.querySelector('h2').textContent = 'click';
    }

    // second subscription
    if(numclicks > 1){
        document.querySelector('h2').textContent = ''+numclicks+'x click';
    }

    // merged subscription
    if (numclicks === 0) {
        document.querySelector('h2').textContent = '';
    }
 }

The throttle calls mean that the body of the sole click stream for loop is actually just pushing click events into two buffers and reseting the timer in each of the three throttle operators. h2 is set when one of the three throttle timers fires. since the timers are not shared it is like a separate for loop per throttle timer with each loop setting h2 to only one of the three possible values:

This behavior is similar in all the Rx family.

Regarding rxcpp in particular:

rxcpp is missing the buffer overload that allows a observable to trigger a transition to a new buffer. rxcpp does not have throttle implemented yet. rxcpp is not thread-safe by default (pay-for-play) so if the throttle timers used introduce threads, then coordinations must be used to explicitly add thread-safety.

Upvotes: 1

Related Questions