Reputation: 6928
I'm struggling to implement something I assumed would be fairly simple in Rx.
I have a list of items, and I want to have each item emitted with a delay.
It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.
Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
The result is:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
But what I would expect to see is something like this:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
What am I doing wrong?
Upvotes: 77
Views: 69927
Reputation: 9863
Regarding eis' comment "wonder why isn't any of the answers actually answering the question. Why isn't this working, what's wrong with it?":
It's behaving differently than expected because delaying an item means its emission time is delayed relative to the time the item would otherwise be emitted - not relative to the previous item.
Imagine the OP's observable without any delay: All items are emitted in quick succession (in the same millisecond). With delay, each item is emitted later. But since the same delay is applied to each item, their relative emission times do not change. They are still emitted in one millisecond.
Think of a person entering the room at 14:00. Another person enters at 14:01. If you apply a delay of one hour to both, they enter at 15:00 and 15:01. There is still just one minute between them.
Upvotes: 1
Reputation: 200
A Swift extension for both approaches suggested in this post.
Concat
import RxSwift
extension Observable {
public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
return self.concatMap { Observable.just($0).delay(dueTime, scheduler: scheduler) }
}
}
Zip
import RxSwift
extension Observable {
public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
return Observable.zip(
Observable<Int>.interval(period, scheduler: scheduler),
self
) { $1 }
}
}
Usage
Observable.range(start: 1, count: 5)
.delayEach(.seconds(1), scheduler: MainScheduler.instance)
My personal preference goes out to the concat approach since it will also work as expected when the upstream emits items at a slower rate than the delay interval.
And yes the original post is RxJava specific, but Google also brings you here for RxSwift queries.
Upvotes: 1
Reputation: 8300
The simplest way to do this seems to be just using concatMap
and wrapping each item in a delayed Obserable.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Prints:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Upvotes: 80
Reputation: 43304
For kotlin users, I wrote an extension function for the 'zip with interval' approach
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)
It works the same way, but this makes it reusable. Example:
Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)
Upvotes: 24
Reputation: 1296
Observable.just("A", "B", "C", "D", "E", "F")
.flatMap { item -> Thread.sleep(2000)
Observable.just( item ) }
.subscribe { println( it ) }
Upvotes: 1
Reputation: 4257
To introduce delay between each item emitted is useful:
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);
More good options at https://github.com/ReactiveX/RxJava/issues/3505
Upvotes: 5
Reputation: 803
You can use
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return aLong.intValue() + 1;
}
})
.startWith(0)
.take(listInput.size())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(TAG, "---index of your list --" + index);
}
});
This code above not duplicate value(index). "I'm sure"
Upvotes: 1
Reputation: 525
I think it's exactly what you need. Take look:
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});
Upvotes: 6
Reputation: 10971
Just sharing a simple approach to emit each item in a collection with an interval:
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
Each item will be emitted every 500 milliseconds
Upvotes: 49
Reputation: 5335
you should be able to achieve this by using Timer
operator. I tried with delay
but couldn't achieve the desired output. Note nested operations done in flatmap
operator.
Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));
Upvotes: 0
Reputation: 1350
You can add a delay between emitted items by using flatMap, maxConcurrent and delay()
Here is an example - emit 0..4 with delay
@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5
val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0
Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})
latch.await()
assertTrue { endMoment - startMoment >= DELAY * COUNT }
}
... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547
Upvotes: 0
Reputation: 10856
There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.
here what i have tried.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Upvotes: 1
Reputation: 784
A not so clean way is to make the delay change with the iteration using the .delay(Func1) operator.
Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Upvotes: 1
Reputation: 53
You can implement a custom rx operator such as MinRegularIntervalDelayOperator
and then use this with the lift
function
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Upvotes: 3
Reputation: 5823
To delay each group you can change your flatMap()
to return an Observable that delays emitting the group.
Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Upvotes: 1
Reputation: 31996
One way to do it is to use zip
to combine your observable with an Interval
observable to delay the output.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Upvotes: 68
Reputation: 2927
I think you want this:
Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.
Upvotes: 0