Reputation: 1580
I'm trying to construct an observable stream of values from the Stash Rest Api of pull requests. Unfortunately, the information of whether or not a PR has merge conflicts is available at a different endpoint to the list of merges.
The list of open pull requests is visible at, say, http://my.stash.com/rest/api/1.0/projects/myproject/repos/myrepo/pull-requests
For each PR, the data on merge conflicts is visible at http://my.stash.com/rest/api/1.0/projects/myproject/repos/myrepo/pull-requests/[PR-ID]/merge
Using the atlas-stash package, I can create and subscribe to an observable stream of pull requests (updated every second):
let pullRequestsObs = Rx.Observable.create(function(o) {
stash.pullRequests(project, repo)
.on('error', function(error) {o.onError(error)})
.on('allPages', function(data) {
o.onNext(data);
o.onCompleted();
});
});
let pullRequestStream = pullRequestsObs
.take(1)
.merge(
Rx.Observable
.interval(1000)
.flatMapLatest(pullRequestsObs)
);
pullRequestsStream.subscribe(
(data) => {
console.log(data)
// do something with data
},
(error) => log.error(error),
() => log.info('done')
);
This works as I want and expect. In the end, the pullRequestsStream
is an observable whose values are lists of JSON objects.
I would like the pullRequestsStream
values to be updated so every element of the list includes information from the [PR-ID]/merge
api.
I assume that this can be achieved using a map
on pullRequestsStream
, but I'm not succeeding in doing this.
let pullRequestWithMergeStream = pullRequestStream.map(function(prlist) {
_.map(prlist, function(pr) {
let mergeObs = Rx.Observable.create(function(o) {
stash.pullRequestMerge(project, repo, pr['id'])
.on('error', function(error) {o.onError(error)})
.on('newPage', function(data) {
o.onNext(data);
o.onCompleted();
}).take(1);
});
mergeObs.subscribe(
(data) => {
pr['merge'] = data;
return pr; // this definitely isn't right
},
(error) => log.error(error),
() => log.info('done')
);
});
});
With a bit of logging, I can see that both the pull-request and the merge apis are being hit correctly, but when I subscribe to pullRequestWithMergeStream
I
get undefined values.
Using return
within the the subscribe
step within a map
doesn't work (and doesn't seem like it should) but I can't figure out what pattern/idiom would achieve what I want.
Is there a correct way of doing this? Have I gone completely down the wrong track?
Can I update values from an Rxjs.Observable with information from a different observable?
Upvotes: 2
Views: 2578
Reputation: 18665
You could use flatMap
or concatMap
to have one task trigger another one. You could use forkJoin
to request the merges in parallel and collect the result in one place. It is not tested, but it should go like this :
pullRequestStream.concatMap(function (prlist){
var arrayRequestMerge = prlist.map(function(pr){
return Rx.Observable.create(function(o) {...same as your code});
});
return Rx.Observable.forkJoin(arrayRequestMerge)
.do(function(arrayData){
prlist.map(function(pr, index){pr['merge']=arrayData[index]
})})
.map(function(){return prlist})
})
PS : I supposed prlist
was an array.
UPDATE
Following your comment, here is a version that will run only maxConcurrent
calls in parallels.
pullRequestStream.concatMap(function (prlist){
var arrayRequestMerge = prlist.map(function(pr, index){
return Rx.Observable.create(function(o) {
stash.pullRequestMerge(project, repo, pr['id'])
.on('error', function(error) {o.onError(error)})
.on('newPage', function(data) {
o.onNext({data: data, index : index});
o.onCompleted();
}).take(1);
});
});
var maxConcurrent = 2;
Rx.Observable.from(arrayRequestMerge)
.merge(maxConcurrent)
.do(function(obj){
prlist[obj.index]['merge'] = obj.data
})})
.map(function(){return prlist})
})
Upvotes: 2