laffoyb
laffoyb

Reputation: 1580

RxJS Observable fire onCompleted after a number of async actions

I'm trying to create an observable that produces values from a number of asynchronous actions (http requests from a Jenkins server), that will let a subscriber know once all the actions are completed. I feel like I must be misunderstanding something because this fails to do what I expect.

'use strict';

let Rx = require('rx');
let _ = require('lodash');
let values = [
    {'id': 1, 'status': true},
    {'id': 2, 'status': true},
    {'id': 3, 'status': true}
];

function valuesObservable() {

    return Rx.Observable.create(function(observer) {
        _.map(values, function(value) {
            var millisecondsToWait = 1000;
            setTimeout(function() { // just using setTimeout here to construct the example
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
        console.log("valuesObservable Sending onCompleted");
        observer.onCompleted()
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // do something with the info
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // do something else once done
});

valuesObservable().subscribe(observer);

Running this, I get output:

valuesObservable Sending onCompleted
DONE!
Sending value:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Sending value:  { id: 3, status: true }

While what I would like to see is something more like:

Sending value:  { id: 1, status: true }
Received Data:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Received Data:  { id: 2, status: true }
Sending value:  { id: 3, status: true }
Received Data:  { id: 3, status: true }
valuesObservable Sending onCompleted
DONE!

I don't actually care about the order of the items in the list, I would just like the observer to receive them.

I believe what is happening is that Javascript asynchronously fires the timeout function, and proceeds immediately to the observer.onCompleted() line. Once the subscribing observer receives the onCompleted event (is that the right word?), it decides that it's done and disposes of itself. Then when the async actions complete and the observable fires onNext, the observer no longer exists to take any actions with them.

If I'm right about this, I'm still stumped about how to make it behave in the way I would like. Have I stumbled into an antipattern without realising it? Is there a better way of approaching this whole thing?


Edit:

Since I used setTimeout to construct my example, I realised I can use it to partially solve my problem by giving the observable a timeout.

function valuesObservable() {

    return Rx.Observable.create(function(observer) {
        let observableTimeout = 10000;
        setTimeout(function() {
            console.log("valuesObservable Sending onCompleted");
            observer.onCompleted();
        }, observableTimeout);
        _.map(values, function(value) {
            let millisecondsToWait = 1000;
            setTimeout(function() {
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
    });
}

This gets me all of the information from the observable in the order I want (data, then completion) but depending on the choice of timeout, I either may miss some data, or have to wait a long time for the completion event. Is this just a inherent problem of asynchronous programming that I have to live with?

Upvotes: 1

Views: 2304

Answers (3)

laffoyb
laffoyb

Reputation: 1580

With thanks to @paulpdaniels, this is the final code that did what I wanted, including the calls to Jenkins:

'use strict';

let Rx = require('rx');
let jenkinsapi = require('jenkins'); // https://github.com/silas/node-jenkins/issues
let jenkinsOpts = {
    "baseUrl": "http://localhost:8080",
    "options": {"strictSSL": false},
    "job": "my-jenkins-job",
    "username": "jenkins",
    "apiToken": "f4abcdef012345678917a"
};
let jenkins = jenkinsapi(JSON.parse(JSON.stringify(jenkinsOpts)));

function jobInfoObservable(jenkins, jobName) {
    // returns an observable with a containing a single list of builds for a given job
    let selector = {tree: 'builds[number,url]'};

    return Rx.Observable.fromNodeCallback(function(callback) {
        jenkins.job.get(jobName, selector, callback);
    })();
}

function buildIDObservable(jenkins, jobName) {
    // returns an observable containing a stream of individual build IDs for a given job
    return jobInfoObservable(jenkins, jobName).flatMap(function(jobInfo) {
        return Rx.Observable.from(jobInfo.builds)
    });
}

function buildInfoObservable(jenkins, jobName) {
    // returns an observable containing a stream of http response for each build in the history for this job
    let buildIDStream = buildIDObservable(jenkins, jobName);
    let selector = {'tree': 'actions[parameters[name,value]],building,description,displayName,duration,estimatedDuration,executor,id,number,result,timestamp,url'};

    return buildIDStream.flatMap(function(buildID) {
        return Rx.Observable.fromNodeCallback(function(callback) {
            jenkins.build.get(jobName, buildID.number, selector, callback);
        })();
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // do something with the info
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // do something else once done
});

buildInfoObservable(jenkins, jenkinsOpts.job).subscribe(observer);

By relying on the Rx built-in operators I managed to avoid messing about with timing logic altogether. This is also much cleaner than nesting multiple Rx.Observable.create statements.

Upvotes: 0

paulpdaniels
paulpdaniels

Reputation: 18663

Yes there is a better way. The problem right now is that you are relying on time delays for your synchronization when in fact you can use the Observable operators to do so instead.

The first step is to move away from directly using setTimeout. Instead use timer

Rx.Observable.timer(waitTime);

Next you can lift the values array into an Observable such that each value is emitted as an event by doing:

Rx.Observable.from(values);

And finally you would use flatMap to convert those values into Observables and flatten them into the final sequence. The result being an Observable that emits each time one of the source timers emits, and completes when all the source Observables complete.

Rx.Observable.from(values)
  .flatMap(
    // Map the value into a stream
    value => Rx.Observable.timer(waitTime),
    // This function maps the value returned from the timer Observable
    // back into the original value you wanted to emit
    value => value
  )

Thus the complete valuesObservable function would look like:

function valuesObservable(values) {
  return Rx.Observable.from(values)
    .flatMap(
      value => Rx.Observable.timer(waitTime),
      value => value
    )
    .do(
      x => console.log(`Sending value: ${value}`),
      null,
      () => console.log('Sending values completed')
    );
}

Note the above would work as well if you weren't using demo stream, i.e. if you had really http streams you could even simplify by using merge (or concat to preserve order)

Rx.Observable.from(streams)
    .flatMap(stream => stream);

// OR
Rx.Observable.from(streams).merge();

// Or simply
Rx.Observable.mergeAll(streams);

Upvotes: 2

user3743222
user3743222

Reputation: 18665

The best way to construct an observable is to use the existing primitive and then a combination of the existing operators. This avoids a few headaches (unsubscription, error management etc.). Then Rx.Observable.create is certainly useful when nothing else fits your use case. I wonder if generateWithAbsoluteTime would fit.

Anyways, here the issue you run into is that you complete your observer before you send him data. So basically you need to come up with a better completion signal. Maybe :

  • complete x seconds after last value emitted if no new value is emitted
  • complete when a value is equal to some 'end' value

Upvotes: 0

Related Questions