vasigorc
vasigorc

Reputation: 962

RxJava: PublishSubject acts synchronously

I need a functionality that would allow to push asynchronously messages to my PublishSubject and to process them at a certain pace (actually one by one) via a ConnectableObservable. Unfortunately it seems that the call to onNext of the PublishSubject is not released until the underlying Subscriber processes the message.

It takes good few seconds to process each message and in debug mode I see that it executes before invocation of the method that pushes the message to PublishSubject is removed from stack - "After push..." always appear in console after internal logs inside the Subscriber...

So I have this RestEndpoint:

@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            try {
                Message metadata = processor.apply(extId);
                log.info("Before push...");
                dataImporter.pushData(metadata);
                log.info("After push...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    return Response.ok("Request received successfully").build();

}

Here's the constructor of the DataImporter:

public DataImporter(final String configFile) {
        dataToImportSubject = PublishSubject.create();
        dataToImportObservable = dataToImportSubject.publish();
        dataToImportObservable.connect();
        dataToImportObservable
            .onBackpressureBuffer(1, new Action0() {

                @Override
                public void call() {
                    logger.debug("Buffer full...");
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<Message>() {

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub

                }

                @Override
                public void onError(Throwable e) {
                    logger.error("Error importing "+e.getMessage());
                }

                @Override
                public void onNext(Message value) {
                    request(1);
                    importResult(configFile, value);
                }

                @Override
                public void onStart() {
                    request(1);
                }
            });
    }

Then pushData of DataImporter is just pushing to PublishSubject's onNext method..:

public void pushData(Message metadata) {
    dataToImportSubject.onNext(metadata);       
}

And here're the declaration of PublishSubject and ConnectableObservable:

public class DataImporter implements ImporterProxy{

    private final PublishSubject<Message> dataToImportSubject;
    private final ConnectableObservable<Message> dataToImportObservable;

Upvotes: 0

Views: 1270

Answers (2)

akarnokd
akarnokd

Reputation: 69997

PublishSubjects emit to their consumers on the thread of the original onXXX call:

JavaDocs

Scheduler:

PublishSubject does not operate by default on a particular Scheduler and the Observers get notified on the thread the respective onXXX methods were invoked.

You have to move the processing to some other thread with observeOn because the observeOn can move the onXXX calls to another thread.

subscribeOn does not have any practical effect on Subjects in general because it only affects the subscription thread, and won't modulate the subsequent onXXX calls to those subjects.

Upvotes: 2

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

RxJava, by default, is synchronous. You need to introduce operators into your observer chain to perform actions on other threads. When you read the documentation on each operator in Observable, you will see statements like "... does not operator on a particular scheduler" -- this indicates that data flows through that operator synchronously.

To get an observer chain to perform actions on other threads, you can use an operator like subscribeOn() with a scheduler to have operations performed on that scheduler. In your example, you likely will want to use Schedulers.io() to provide a background thread.

Upvotes: 0

Related Questions