user432024
user432024

Reputation: 4675

How to process all events emitted by RX Java regardless of error?

I'm using vertx.io web framework to send a list of items to a downstream HTTP server.

records.records() emits 4 records and I have specifically set the web client to connect to the wrong I.P/port.

Processing... prints 4 times.

Exception outer! prints 3 times.

If I put back the proper I.P/port then Susbscribe outer! prints 4 times.

io.reactivex.Flowable
    .fromIterable(records.records())
    .flatMap(inRecord -> {
        System.out.println("Processing...");

        // Do stuff here....
        Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(...));

        Single<HttpResponse<Buffer>> request = client
          .post(..., ..., ...)
          .rxSendStream(bodyBuffer);

        return request.toFlowable();
    })
    .subscribe(record -> {
        System.out.println("Subscribe outer!");
    }, ex -> {
        System.out.println("Exception outer! " + ex.getMessage());
    });

UPDATE:

I now understand that on error RX stops right a way. Is there a way to continue and process all records regardless and get an error for each?

Upvotes: 1

Views: 770

Answers (2)

jaychang0917
jaychang0917

Reputation: 1888

Is there a way to continue and process all records regardless and get an error for each?

According to the doc, the observable should be terminated if it encounters an error. So you can't get each error in onError.

You can use onErrorReturn or onErrorResumeNext() to tell the upstream what to do if it encounters an error (e.g. emit null or Flowable.empty()).

Upvotes: 0

user432024
user432024

Reputation: 4675

Given this article: https://medium.com/@jagsaund/5-not-so-obvious-things-about-rxjava-c388bd19efbc

I have come up with this... Unless you see something wrong with this?

io.reactivex.Flowable
.fromIterable(records.records())
.flatMap
(inRecord -> {
    Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(inRecord.toString()));

    Single<HttpResponse<Buffer>> request = client
            .post("xxxxxx", "xxxxxx", "xxxxxx")
            .rxSendStream(bodyBuffer);

    // So we can capture how long each request took.
    final long startTime = System.currentTimeMillis();

    return request.toFlowable()
        .doOnNext(response -> {
            // Capture total time and print it with the logs. Removed below for brevity.
            long processTimeMs = System.currentTimeMillis() - startTime;

            int status = response.statusCode();

            if(status == 200)
                logger.info("Success!");
            else
                logger.error("Failed!");
        }).doOnError(ex -> {
                long processTimeMs = System.currentTimeMillis() - startTime;

                logger.error("Failed! Exception.", ex);
        }).doOnTerminate(() -> {
          // Do some extra stuff here... 
        }).onErrorResumeNext(Flowable.empty()); // This will allow us to continue.
    }
).subscribe(); // Don't handle here. We subscribe to the inner events.

Upvotes: 1

Related Questions