Cihan Keser
Cihan Keser

Reputation: 3261

How to emit items from a Collection with delay in RxJava?

I have the below code emitting items from a collection.

// fills the given coll with some items and
// returns the observable emitting the filled collection
// does ASYNC work
Observable<Collection> generate(Collection coll){ ... }

def coll = []
generate(coll).flatMap({ filledColl ->
    rx.Observable.from(filledColl)
}).subscribe({ ... })

The problem is that this collection can contain thousands of items and since generate works async, this code causes the subscribe method to be called thousands of times almost instantly (which is not wanted for the work I'm doing inside observer).

How can I modify this code to emit items from collection with a delay? For example: emit 100 items then wait 100ms then emit another 100 items or wait 10ms before emitting next item?

Upvotes: 2

Views: 1397

Answers (1)

Andr&#233; Staltz
Andr&#233; Staltz

Reputation: 14004

Inside the flatMap, you need to split the filledColl into smaller parts, delay each part, and merge them all into one observable which you will return inside the flatMap.

generate(coll).flatMap({ filledColl ->
    def chunkSize = 100
    resultStream = rx.Observable.never()
    for (i in 0 ..< filledCol.size()/chunkSize) {
        def chunk = filledCol[i*chunkSize .. (i+1)*chunkSize]
        resultStream = resultStream.mergeWith(
            rx.Observable.from(chunk).delay(100*i, TimeUnit.MILLISECONDS)
        )
    }
    resultStream
}).subscribe({ ... })

That's just the rough idea, you might still want to test, tweak and correct according to your needs. Also, it might make more sense to move this into the generate function, but that's up to you since I cannot know what is in generate().

Upvotes: 2

Related Questions