Reputation: 4545
I have 2 services in my application. A youTube service that loads youTube comment threads and comments over the wire and a comment service that manages the loading of batches of comments. The comments service is specific to my app, the youTube service is app agnostic.
I have a function getCommentThreadsForChannel
that loads comment threads. The main implementation for this is in the youTube service but this is called on the comment service which basically just calls returns the observable from the youTube service.
As far as my controller that calls this is concerned this is just an observable sequence of comment threads. However, in my commentService I want to store these threads locally. I want to batch this into storing all the threads whenever I get 100 more threads so that I am not processing the list for every single new bit of data. I have come up with this code:
getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
var threadStream: Rx.Observable<ICommentThread> =
this.youTubeService.getCommentThreadsForChannel();
threadStream
.bufferWithCount(100)
.scan( ( allItems, currentItem ) => {
currentItem.forEach(thread => {
allItems.push(thread);
});
console.log( `Save items to local storage: ${allItems.length}` )
return allItems;
}, [] );
return threadStream;
}
I think that the logic here for batching the threads and for accumulating all of the threads into one array is fine but this code is never called. I presume that this is because I am not subscribing to this thread at all.
I do not want to subscribe in here as that would subscribe to the underlying stream and I would then have 2 subscriptions and all the data would load twice (there is a LOT of data - it takes about a minute to load it all and over 30 calls of 100 threads at a time).
I basically want a do
here that will not affect the stream that is passed to the controller but I want to use the buffering and accumulating logic of RXjs.
I presume that I need to share or publish the stream in some way but I have little success using these operators before and can't see how I would be able to do it without adding a second subscribe.
How do I share one stream and use it in 2 different ways without subscribing twice? Can I create some sort of passive stream that only subscribes when the observable it is based on subscribes?
Upvotes: 1
Views: 802
Reputation: 4545
I figured this out in the end (with some help from @MonkeyMagiic).
I had to share the stream so that I could do 2 different things with the data, buffer it AND at on each value individually. The problem was that both of these streams had to be subscribed to but I didn't want to subscribe in the service - that should be done in the controller.
The solution was to combine the 2 streams again and ignore values from the buffer:
var intervalStream = Rx.Observable.interval(250)
.take(8)
.do( function(value){console.log( "source: " + value );} )
.shareReplay(1);
var bufferStream = intervalStream.bufferWithCount(3)
.do( function(values){
console.log( "do something with buffered values: " + values );
} )
.flatMap( function(values){ return Rx.Observable.empty(); } );
var mergeStream = intervalStream.merge( bufferStream );
mergeStream.subscribe(
function( value ){ console.log( "value received by controller: " + value ); },
function( error ){ console.log( "error: " + error ); },
function(){ console.log( "onComplete" ); }
);
Output:
"source: 0"
"value received by controller: 0"
"source: 1"
"value received by controller: 1"
"source: 2"
"value received by controller: 2"
"do something with buffered values: 0,1,2"
"source: 3"
"value received by controller: 3"
"source: 4"
"value received by controller: 4"
"source: 5"
"value received by controller: 5"
"do something with buffered values: 3,4,5"
"source: 6"
"value received by controller: 6"
"source: 7"
"value received by controller: 7"
"do something with buffered values: 6,7"
"onComplete"
Upvotes: 1
Reputation: 138
I believe what you are looking for is a combination of share and replay, thankfully RX has the shareReplay(bufferSize).
var threadStream: Rx.Observable<ICommentThread> = this.youTubeService
.getCommentThreadsForChannel()
.shareReplay(1);
getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
threadStream
.bufferWithCount(100)
.scan( ( allItems, currentItem ) => {
currentItem.forEach(thread => {
allItems.push(thread);
});
console.log( `Save items to local storage: ${allItems.length}` )
return allItems;
}, [] );
return threadStream;
}
Using the multicast operator share will ensure that that each additional subscription after the first does not make the unnecessary request and the replay is to ensure all future subscriptions receive the last notification.
Upvotes: 0