Reputation: 4675
I'm using vertx.io web framework to send a list of items to a downstream HTTP server.
records.records()
emits 4 records and I have specifically set the web client to connect to the wrong I.P/port.
Processing...
prints 4 times.
Exception outer!
prints 3 times.
If I put back the proper I.P/port then Susbscribe outer!
prints 4 times.
io.reactivex.Flowable
.fromIterable(records.records())
.flatMap(inRecord -> {
System.out.println("Processing...");
// Do stuff here....
Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(...));
Single<HttpResponse<Buffer>> request = client
.post(..., ..., ...)
.rxSendStream(bodyBuffer);
return request.toFlowable();
})
.subscribe(record -> {
System.out.println("Subscribe outer!");
}, ex -> {
System.out.println("Exception outer! " + ex.getMessage());
});
UPDATE:
I now understand that on error RX stops right a way. Is there a way to continue and process all records regardless and get an error for each?
Upvotes: 1
Views: 770
Reputation: 1888
Is there a way to continue and process all records regardless and get an error for each?
According to the doc, the observable should be terminated if it encounters an error. So you can't get each error in onError
.
You can use onErrorReturn
or onErrorResumeNext()
to tell the upstream what to do if it encounters an error (e.g. emit null or Flowable.empty()
).
Upvotes: 0
Reputation: 4675
Given this article: https://medium.com/@jagsaund/5-not-so-obvious-things-about-rxjava-c388bd19efbc
I have come up with this... Unless you see something wrong with this?
io.reactivex.Flowable
.fromIterable(records.records())
.flatMap
(inRecord -> {
Observable<Buffer> bodyBuffer = Observable.just(Buffer.buffer(inRecord.toString()));
Single<HttpResponse<Buffer>> request = client
.post("xxxxxx", "xxxxxx", "xxxxxx")
.rxSendStream(bodyBuffer);
// So we can capture how long each request took.
final long startTime = System.currentTimeMillis();
return request.toFlowable()
.doOnNext(response -> {
// Capture total time and print it with the logs. Removed below for brevity.
long processTimeMs = System.currentTimeMillis() - startTime;
int status = response.statusCode();
if(status == 200)
logger.info("Success!");
else
logger.error("Failed!");
}).doOnError(ex -> {
long processTimeMs = System.currentTimeMillis() - startTime;
logger.error("Failed! Exception.", ex);
}).doOnTerminate(() -> {
// Do some extra stuff here...
}).onErrorResumeNext(Flowable.empty()); // This will allow us to continue.
}
).subscribe(); // Don't handle here. We subscribe to the inner events.
Upvotes: 1