Reputation: 3261
I have the below code emitting items from a collection.
// fills the given coll with some items and
// returns the observable emitting the filled collection
// does ASYNC work
Observable<Collection> generate(Collection coll){ ... }
def coll = []
generate(coll).flatMap({ filledColl ->
rx.Observable.from(filledColl)
}).subscribe({ ... })
The problem is that this collection can contain thousands of items and since generate
works async, this code causes the subscribe method to be called thousands of times almost instantly (which is not wanted for the work I'm doing inside observer).
How can I modify this code to emit items from collection with a delay? For example: emit 100 items then wait 100ms then emit another 100 items or wait 10ms before emitting next item?
Upvotes: 2
Views: 1397
Reputation: 14004
Inside the flatMap, you need to split the filledColl
into smaller parts, delay each part, and merge them all into one observable which you will return inside the flatMap.
generate(coll).flatMap({ filledColl ->
def chunkSize = 100
resultStream = rx.Observable.never()
for (i in 0 ..< filledCol.size()/chunkSize) {
def chunk = filledCol[i*chunkSize .. (i+1)*chunkSize]
resultStream = resultStream.mergeWith(
rx.Observable.from(chunk).delay(100*i, TimeUnit.MILLISECONDS)
)
}
resultStream
}).subscribe({ ... })
That's just the rough idea, you might still want to test, tweak and correct according to your needs. Also, it might make more sense to move this into the generate function, but that's up to you since I cannot know what is in generate().
Upvotes: 2