Reputation: 962
I need a functionality that would allow to push asynchronously messages to my PublishSubject
and to process them at a certain pace (actually one by one) via a ConnectableObservable
. Unfortunately it seems that the call to onNext
of the PublishSubject
is not released until the underlying Subscriber
processes the message.
It takes good few seconds to process each message and in debug mode I see that it executes before invocation of the method that pushes the message to PublishSubject is removed from stack - "After push..."
always appear in console after internal logs inside the Subscriber
...
So I have this RestEndpoint:
@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();
}
Here's the constructor of the DataImporter:
public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {
@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}
@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}
@Override
public void onStart() {
request(1);
}
});
}
Then pushData
of DataImporter is just pushing to PublishSubject
's onNext
method..:
public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}
And here're the declaration of PublishSubject
and ConnectableObservable
:
public class DataImporter implements ImporterProxy{
private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;
Upvotes: 0
Views: 1270
Reputation: 69997
PublishSubject
s emit to their consumers on the thread of the original onXXX
call:
Scheduler:
PublishSubject
does not operate by default on a particularScheduler
and theObserver
s get notified on the thread the respectiveonXXX
methods were invoked.
You have to move the processing to some other thread with observeOn
because the observeOn
can move the onXXX
calls to another thread.
subscribeOn
does not have any practical effect on Subject
s in general because it only affects the subscription thread, and won't modulate the subsequent onXXX
calls to those subjects.
Upvotes: 2
Reputation: 8227
RxJava, by default, is synchronous. You need to introduce operators into your observer chain to perform actions on other threads. When you read the documentation on each operator in Observable
, you will see statements like "... does not operator on a particular scheduler" -- this indicates that data flows through that operator synchronously.
To get an observer chain to perform actions on other threads, you can use an operator like subscribeOn()
with a scheduler to have operations performed on that scheduler. In your example, you likely will want to use Schedulers.io()
to provide a background thread.
Upvotes: 0