athor
athor

Reputation: 6928

RxJava delay for each item of list emitted

I'm struggling to implement something I assumed would be fairly simple in Rx.

I have a list of items, and I want to have each item emitted with a delay.

It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.

Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

The result is:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

But what I would expect to see is something like this:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

What am I doing wrong?

Upvotes: 77

Views: 69927

Answers (17)

Dabbler
Dabbler

Reputation: 9863

Regarding eis' comment "wonder why isn't any of the answers actually answering the question. Why isn't this working, what's wrong with it?":

It's behaving differently than expected because delaying an item means its emission time is delayed relative to the time the item would otherwise be emitted - not relative to the previous item.

Imagine the OP's observable without any delay: All items are emitted in quick succession (in the same millisecond). With delay, each item is emitted later. But since the same delay is applied to each item, their relative emission times do not change. They are still emitted in one millisecond.

Think of a person entering the room at 14:00. Another person enters at 14:01. If you apply a delay of one hour to both, they enter at 15:00 and 15:01. There is still just one minute between them.

Upvotes: 1

Houwert
Houwert

Reputation: 200

A Swift extension for both approaches suggested in this post.

Concat

import RxSwift

extension Observable {
    public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
        return self.concatMap { Observable.just($0).delay(dueTime, scheduler: scheduler) }
    }
}

Zip

import RxSwift

extension Observable {
    public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
        return Observable.zip(
            Observable<Int>.interval(period, scheduler: scheduler),
            self
        ) { $1 }
    }
}

Usage

Observable.range(start: 1, count: 5)
    .delayEach(.seconds(1), scheduler: MainScheduler.instance)

My personal preference goes out to the concat approach since it will also work as expected when the upstream emits items at a slower rate than the delay interval.

And yes the original post is RxJava specific, but Google also brings you here for RxSwift queries.

Upvotes: 1

Magnus
Magnus

Reputation: 8300

The simplest way to do this seems to be just using concatMap and wrapping each item in a delayed Obserable.

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

Prints:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

Upvotes: 80

Tim
Tim

Reputation: 43304

For kotlin users, I wrote an extension function for the 'zip with interval' approach

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )

It works the same way, but this makes it reusable. Example:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)

Upvotes: 24

Vairavan
Vairavan

Reputation: 1296

Observable.just("A", "B", "C", "D", "E", "F")
    .flatMap { item -> Thread.sleep(2000)
        Observable.just( item ) }
    .subscribe { println( it ) }

Upvotes: 1

yaircarreno
yaircarreno

Reputation: 4257

To introduce delay between each item emitted is useful:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);

More good options at https://github.com/ReactiveX/RxJava/issues/3505

Upvotes: 5

Duy Phan
Duy Phan

Reputation: 803

You can use

   Observable.interval(1, TimeUnit.SECONDS)
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return aLong.intValue() + 1;
                }
            })
            .startWith(0)
            .take(listInput.size())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer index) throws Exception {
                    Log.d(TAG, "---index of your list --" + index);
                }
            });

This code above not duplicate value(index). "I'm sure"

Upvotes: 1

Vlad
Vlad

Reputation: 525

I think it's exactly what you need. Take look:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });

Upvotes: 6

Mina Wissa
Mina Wissa

Reputation: 10971

Just sharing a simple approach to emit each item in a collection with an interval:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

Each item will be emitted every 500 milliseconds

Upvotes: 49

Abhishek Bansal
Abhishek Bansal

Reputation: 5335

you should be able to achieve this by using Timer operator. I tried with delay but couldn't achieve the desired output. Note nested operations done in flatmap operator.

    Observable.range(1,5)
            .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                        .map(y -> x))
            // attach timestamp
            .timestamp()
            .subscribe(timedIntegers ->
                    Log.i(TAG, "Timed String: "
                            + timedIntegers.value()
                            + " "
                            + timedIntegers.time()));

Upvotes: 0

cVoronin
cVoronin

Reputation: 1350

You can add a delay between emitted items by using flatMap, maxConcurrent and delay()

Here is an example - emit 0..4 with delay

@Test
fun testEmitWithDelays() {
    val DELAY = 500L
    val COUNT = 5

    val latch = CountDownLatch(1)
    val startMoment = System.currentTimeMillis()
    var endMoment : Long = 0

    Observable
        .range(0, COUNT)
        .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
        .subscribe(
                { println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
                {},
                {
                    endMoment = System.currentTimeMillis()
                    latch.countDown()
                })

    latch.await()

    assertTrue { endMoment - startMoment >= DELAY * COUNT }
}

... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547

Upvotes: 0

Sanket Kachhela
Sanket Kachhela

Reputation: 10856

There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.

here what i have tried.

Observable.range(1, 5)
          .groupBy(n -> n % 5)
          .concatMap(integerIntegerGroupedObservable ->
          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
          .doOnNext(item -> {
                    System.out.println(System.currentTimeMillis() - timeNow);
                    System.out.println(item);
                    System.out.println(" ");
                }).toList().toBlocking().first(); 

Upvotes: 1

Tushar Nallan
Tushar Nallan

Reputation: 784

A not so clean way is to make the delay change with the iteration using the .delay(Func1) operator.

Observable.range(1, 5)
            .delay(n -> n*50)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

Upvotes: 1

Matias Irland Tomas
Matias Irland Tomas

Reputation: 53

You can implement a custom rx operator such as MinRegularIntervalDelayOperator and then use this with the lift function

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

Upvotes: 3

kjones
kjones

Reputation: 5823

To delay each group you can change your flatMap() to return an Observable that delays emitting the group.

Observable
        .range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g ->
                Observable
                        .timer(50, TimeUnit.MILLISECONDS)
                        .flatMap(t -> g.toList())
        )
        .doOnNext(item -> {
            System.out.println(System.currentTimeMillis() - timeNow);
            System.out.println(item);
            System.out.println(" ");
        }).toList().toBlocking().first();

Upvotes: 1

iagreen
iagreen

Reputation: 31996

One way to do it is to use zip to combine your observable with an Interval observable to delay the output.

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

Upvotes: 68

FriendlyMikhail
FriendlyMikhail

Reputation: 2927

I think you want this:

Observable.range(1, 5)
            .delay(50, TimeUnit.MILLISECONDS)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.

Upvotes: 0

Related Questions