Reputation: 1315
I am interested how flatMap controls its "child" threads, for example the following code works fine:
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> mDataPackageFlowable)
.sequential();
}
And this code stops after been invoked 128 times (that is default maxConcurent for flowable):
private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
return mPlcIntervalFlowable.onBackpressureLatest()
.subscribeOn(Schedulers.single())
.publish();
}
Subscribe:
addDisposable(mGetPlcUpdatesChanelUseCase.execute()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(plcDto -> Timber.d("plcReceiver"),
Timber::e));
UseCase:
public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {
private final PlcRepository mPlcRepository;
public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
mPlcRepository = plcRepository;
}
@Override
public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
return mPlcRepository.getUpdatesChannel();
}
@Override
public boolean isParamsRequired() {
return false;
}
}
Repo metho
@Override
public Flowable<PlcDto> getUpdatesChannel() {
return mPlcCore.getPlcConnectableFlowable()
.map(mPlcInfoTopPlcDtoTransformer::transform);
}
PlcCore method
public ConnectableFlowable<PlcDataPackage> getPlcConnectableFlowable() {
return mConnectableFlowable;
}
And mConnectableFlowable is:
mConnectableFlowable = createConnectablePlcFlowable();
mConnectableFlowable.connect();
So as i understand, mDataPackageFlowable is created once, then it is executed and each time it creates new "thread" for its child, and after 128 executes it just blocks all the following executions.
So there are 3 main questions:
1) Do flatMap control child threads?
2) Why does it execute every new "request" on a new thread?(Maybe not, tell me then)
3) In what cases we can lose control over child threads.
DISCLAMER: English is my second language, if something is not clear ask me, and il try to add clarification.
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.sequental()
This combination does not work, it actually removes 128 times flatMap invocation limit, but does not clean older innersubscription that leads to a memory leak and OOM exceptions. Use some kind of map instead.
Upvotes: 3
Views: 774
Reputation: 8227
There needs to be a subscription for an observer chain to work properly. When you use interval()
to generate data, you are providing a "hot" observable that emits values on its own. A "cold" observable would emit values only when a subscription occurs.
128 is the number of entries that are buffered by flatMap()
before it stalls. If there is a subscription, then flatMap()
would emit the values downstream that the interior observable produces, and it wouldn't stall.
flatMap()
itself does not operate on a particular scheduler, according to the javadoc. That means that it does not manipulate its subscriptions on particular threads. If you want to control the work being done in the observable invoked by flatMap()
, then you use explicit scheduling:
observable
.flatMap( value -> fun(value).subscribeOn( myScheduler ) )
.subscribe();
myScheduler
might for instance be a Schedulers.io()
, which creates threads when needed. Alternatively, it could be an Executor
that you supply with a fixed number of threads. I have frequently used Executor
s that had only one or two or 48 threads allocated to it to control the fan-out from flatMap()
.
You can also supply a parallelism parameter to flatMap()
which tells it the maximum number of subscriptions that it will maintain. When flatMap()
reaches the maximum, it will buffer up requests until observer chains that it subscribed to have completed.
The parallel()
operator does something similar, but it splits out incoming events, emitting them on separate threads. Again, the javadoc has excellent descriptions, along with good pictures.
It is always possible to lose control over threads. When you use an RxJava operator, read the documentation for it. There are two areas you want to understand. The first area is what scheduler the operator works on. If it says that it does not operate on a particular scheduler, then it does not directly affect the choice of threads or how threads are used. If it states that it uses a particular scheduler, then you need to understand how that scheduler works; there will always be another version of the operator that allows you to supply your own choice of scheduler.
The second area you must understand is back pressure. You need to understand what back pressure means and how it is applied. This is especially important whenever you step over a thread boundary, such as by using observeOn()
or subscribeOn()
.
Upvotes: 3