Reputation: 4490
The rxjava doc definition of switchmap is rather vague and it links to the same page as flatmap. What is the difference between the two operators?
Upvotes: 174
Views: 74008
Reputation: 6261
Code example
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SwitchMapTest {
Logger logger = LogManager.getLogger();
@Test
public void main() throws InterruptedException {
log("main thread");
CountDownLatch latch = new CountDownLatch(1);
var disposable = Observable
.create(emitter -> {
IntStream.range(0, 4)
.peek(i -> {
log("sleep emit");
sleep(TimeUnit.SECONDS, 1);
})
.forEach(emitter::onNext);
emitter.onComplete();
})
.subscribeOn(Schedulers.io())
.switchMap(o ->
Observable.create(emitter -> {
IntStream.range(0, 2).forEach(value -> {
log("sleep switch");
sleep(TimeUnit.MILLISECONDS, 900);
emitter.onNext("original " + o + " | switchMap " + value);
});
emitter.onComplete();
})
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
})))
)
.observeOn(Schedulers.newThread())
.subscribe(this::log, throwable -> logger.throwing(throwable), () -> {
log("complete");
latch.countDown();
});
boolean await = latch.await(10, TimeUnit.SECONDS);
assertTrue(await);
disposable.dispose();
}
private void sleep(@NotNull TimeUnit timeUnit, int timeout) {
try {
timeUnit.sleep(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
void log(Object message) {
logger.debug(message);
}
}
log4j2.xml
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%-8r %d{HH:mm:ss.SSS} [%-32t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="ALL">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
Console
720 21:44:46.566 [Test worker ] DEBUG SwitchMapTest - main thread
787 21:44:46.633 [RxCachedThreadScheduler-1 ] DEBUG SwitchMapTest - sleep emit
1789 21:44:47.635 [RxCachedThreadScheduler-1 ] DEBUG SwitchMapTest - sleep emit
1790 21:44:47.636 [Thread-3 ] DEBUG SwitchMapTest - sleep switch
2695 21:44:48.541 [Thread-3 ] DEBUG SwitchMapTest - sleep switch
2695 21:44:48.541 [RxNewThreadScheduler-1 ] DEBUG SwitchMapTest - original 0 | switchMap 0
2792 21:44:48.638 [RxCachedThreadScheduler-1 ] DEBUG SwitchMapTest - sleep emit
2792 21:44:48.638 [Thread-4 ] DEBUG SwitchMapTest - sleep switch
3693 21:44:49.539 [Thread-4 ] DEBUG SwitchMapTest - sleep switch
3693 21:44:49.539 [RxNewThreadScheduler-1 ] DEBUG SwitchMapTest - original 1 | switchMap 0
3796 21:44:49.642 [RxCachedThreadScheduler-1 ] DEBUG SwitchMapTest - sleep emit
3797 21:44:49.643 [Thread-5 ] DEBUG SwitchMapTest - sleep switch
4699 21:44:50.545 [Thread-5 ] DEBUG SwitchMapTest - sleep switch
4699 21:44:50.545 [RxNewThreadScheduler-1 ] DEBUG SwitchMapTest - original 2 | switchMap 0
4802 21:44:50.648 [Thread-6 ] DEBUG SwitchMapTest - sleep switch
5706 21:44:51.552 [Thread-6 ] DEBUG SwitchMapTest - sleep switch
5706 21:44:51.552 [RxNewThreadScheduler-1 ] DEBUG SwitchMapTest - original 3 | switchMap 0
6612 21:44:52.458 [RxNewThreadScheduler-1 ] DEBUG SwitchMapTest - original 3 | switchMap 1
6612 21:44:52.458 [RxNewThreadScheduler-1 ] DEBUG SwitchMapTest - complete
Upvotes: 0
Reputation: 6899
Map, FlatMap, ConcatMap and SwitchMap applies a function or modifies the data emitted by an Observable.
Map modifies each item emitted by a source Observable and emits the modified item.
FlatMap, SwitchMap and ConcatMap also applies a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again.
FlatMap and ConcatMap work is pretty much same. They merge items emitted by multiple Observables and returns a single Observable.
Upvotes: 17
Reputation: 2615
No flatMap discussion is complete without comparing and contrasting with switchMap
, concatMap
and concatMapEager
.
All of these methods take a Func1
that transform the stream into Observable
s which are then emitted; the difference is when the returned Observable
s are subscribed and unsubscribed to, and if and when those the emissions of those Observable
s are emitted by the ____Map
operator in question.
flatMap
subscribes to as many emitted Observable
s as possible. (It is a platform dependant number. e.g. a lower number on Android) Use this when order is NOT important, and you want emissions ASAP.concatMap
subscribes to the first Observable
and only subscribes to the next Observable
when the previous one has completed. Use this when order is important and you want to conserve resources. A perfect example is deferring a network call by checking the cache first. That may typically be followed by a .first()
or .takeFirst()
to avoid doing unnecessary work.
http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/
concatMapEager
works much the same but subscribes to as many as possible (platform dependant) but will only emit once the previous Observable
has completed. Perfect when you have a lot of parallel-processing that needs to be done, but (unlike flatMap) you want to maintain the original order.
switchMap
will subscribe to the last Observable
it encounters and unsubscribe from all previous Observable
s. This is perfect for cases like search-suggestions: once a user has changed their search query, the old request is no longer of any interest, so it is unsubscribed, and a well behaved Api end-point will cancel the network request.If you are returning Observable
s that don't subscribeOn
another thread, all of the above methods may behave much the same. The interesting, and useful behaviour emerges when you allow the nested Observable
s to act on their own threads. Then you can get get a lot of benefits from parallel processing, and intelligently unsubscribing or not subscribing from Observable
s that don't interest your Subscriber
s
amb
may also be of interest. Given any number of Observable
s it emits the same items that the first Observable
to emit anything emits.
That could be useful when you have multiple sources that could/should return the same thing and you want performance. e.g. sorting, you might amb
a quick-sort with a merge-sort and use whichever was faster.Upvotes: 104
Reputation: 11515
According to the documentation ( http://reactivex.io/documentation/operators/flatmap.html )
the switchMap
is like the flatMap
, but it will only emit items from the new observable until a new event is emitted from the source observable.
The marble diagram shows it well. Notice the difference in the diagrams:
In switchMap
the second original emission (green marble) does not emit its second mapped emission (green square), since the third original emission (blue marble) has begun and already emitted its first mapped emission (blue diamond). In other words, only the first of two mapped green emissions happens; no green square is emitted because the blue diamond beat it.
In flatMap
, all mapped results will be emitted, even if they're "stale". In other words, both first and second of the mapped green emissions happen -- a green square would've been emitted (if they used consistent map function; since they did not, you see the second green diamond, even though it is emitted after the first blue diamond)
flatMap
Upvotes: 203
Reputation: 13372
Here is the one more - 101 line long example. That explains the thing for me.
Like was said: it gets the last observable (the slowest one if you will) and ignores the rest.
As a result:
Time | scheduler | state
----------------------------
0 | main | Starting
84 | main | Created
103 | main | Subscribed
118 | Sched-C-0 | Going to emmit: A
119 | Sched-C-1 | Going to emmit: B
119 | Sched-C-0 | Sleep for 1 seconds for A
119 | Sched-C-1 | Sleep for 2 seconds for B
1123 | Sched-C-0 | Emitted (A) in 1000 milliseconds
2122 | Sched-C-1 | Emitted (B) in 2000 milliseconds
2128 | Sched-C-1 | Got B processed
2128 | Sched-C-1 | Completed
You see the A got ignored.
Upvotes: 3
Reputation: 13481
If you´re looking for an example code
/**
* We switch from original item to a new observable just using switchMap.
* It´s a way to replace the Observable instead just the item as map does
* Emitted:Person{name='Pablo', age=0, sex='no_sex'}
*/
@Test
public void testSwitchMap() {
Observable.just(new Person("Pablo", 34, "male"))
.switchMap(person -> Observable.just(new Person("Pablo", 0, "no_sex")))
.subscribe(System.out::println);
}
You can see more examples here https://github.com/politrons/reactive
Upvotes: 0
Reputation: 1249
switchMap was once called flatMapLatest in RxJS 4.
It basically just passes on the events from the latest Observable and unsubscribes from the previous one.
Upvotes: 62
Reputation: 3037
I came across this when implementing "instant search" - i.e. when user types in a text box, and results appear in near real-time with each key stroke. The solution seems to be:
With flatMap, the search results could be stale, because search responses may come back out of order. To fix this, switchMap should be used, since it ensures that an old observable is unsubscribed once a newer one is provided.
So, in summary, flatMap should be used when all results matter, regardless of their timing, and switchMap should be used when only results from the last Observable matter.
Upvotes: 183