laffoyb
laffoyb

Reputation: 1580

Rxjs: updating values in observable stream with data from another observable, returning a single observable stream

Background

I'm trying to construct an observable stream of values from the Stash Rest Api of pull requests. Unfortunately, the information of whether or not a PR has merge conflicts is available at a different endpoint to the list of merges.

The list of open pull requests is visible at, say, http://my.stash.com/rest/api/1.0/projects/myproject/repos/myrepo/pull-requests

For each PR, the data on merge conflicts is visible at http://my.stash.com/rest/api/1.0/projects/myproject/repos/myrepo/pull-requests/[PR-ID]/merge

Using the atlas-stash package, I can create and subscribe to an observable stream of pull requests (updated every second):

let pullRequestsObs = Rx.Observable.create(function(o) {
    stash.pullRequests(project, repo)
        .on('error', function(error) {o.onError(error)})
        .on('allPages', function(data) {
            o.onNext(data);
            o.onCompleted();
        });
    });

let pullRequestStream = pullRequestsObs
    .take(1)
    .merge(
        Rx.Observable
            .interval(1000)
            .flatMapLatest(pullRequestsObs)
    );

pullRequestsStream.subscribe(
    (data) => {
        console.log(data)
        // do something with data
    },
    (error) => log.error(error),
    () => log.info('done')
);

This works as I want and expect. In the end, the pullRequestsStream is an observable whose values are lists of JSON objects.

My Goal

I would like the pullRequestsStream values to be updated so every element of the list includes information from the [PR-ID]/merge api.

I assume that this can be achieved using a map on pullRequestsStream, but I'm not succeeding in doing this.

let pullRequestWithMergeStream = pullRequestStream.map(function(prlist) {
    _.map(prlist, function(pr) {
        let mergeObs = Rx.Observable.create(function(o) {
            stash.pullRequestMerge(project, repo, pr['id'])
                .on('error', function(error) {o.onError(error)})
                .on('newPage', function(data) {
                    o.onNext(data);
                    o.onCompleted();
                }).take(1);
        });

        mergeObs.subscribe(
            (data) => {
                pr['merge'] = data;
                return pr; // this definitely isn't right
            },
            (error) => log.error(error),
            () => log.info('done')
        );
    });
});

With a bit of logging, I can see that both the pull-request and the merge apis are being hit correctly, but when I subscribe to pullRequestWithMergeStream I get undefined values.

Using return within the the subscribe step within a map doesn't work (and doesn't seem like it should) but I can't figure out what pattern/idiom would achieve what I want.

Is there a correct way of doing this? Have I gone completely down the wrong track?

tl;dr

Can I update values from an Rxjs.Observable with information from a different observable?

Upvotes: 2

Views: 2578

Answers (1)

user3743222
user3743222

Reputation: 18665

You could use flatMap or concatMap to have one task trigger another one. You could use forkJoin to request the merges in parallel and collect the result in one place. It is not tested, but it should go like this :

pullRequestStream.concatMap(function (prlist){
  var arrayRequestMerge = prlist.map(function(pr){
    return Rx.Observable.create(function(o) {...same as your code});
  });
  return Rx.Observable.forkJoin(arrayRequestMerge)
         .do(function(arrayData){
               prlist.map(function(pr, index){pr['merge']=arrayData[index]
             })})
         .map(function(){return prlist})
})

PS : I supposed prlist was an array.

UPDATE Following your comment, here is a version that will run only maxConcurrent calls in parallels.

pullRequestStream.concatMap(function (prlist){
  var arrayRequestMerge = prlist.map(function(pr, index){
    return Rx.Observable.create(function(o) {
        stash.pullRequestMerge(project, repo, pr['id'])
            .on('error', function(error) {o.onError(error)})
            .on('newPage', function(data) {
                o.onNext({data: data, index : index});
                o.onCompleted();
            }).take(1);
    });
  });
  var maxConcurrent = 2;
  Rx.Observable.from(arrayRequestMerge)
    .merge(maxConcurrent)
    .do(function(obj){
               prlist[obj.index]['merge'] = obj.data
             })})
    .map(function(){return prlist})
})

Upvotes: 2

Related Questions