jorgebo10
jorgebo10

Reputation: 540

Why doOnError logs different when subscribing with .subscribe in webflux

i need help to understand why the error message is not logged when subscribing to the stream with .subscribe(). However, the error is logged when subscribing with .block(). I understand that it might be to the block() call subscribing on the main thread. I was wondering which changes should i do in order to see the error logged when subscribing with .subscribe(). Also not sure whether it is the best way to deal listen for sqs messages on webflux. Thanks,

@SqsListener(value = "${sqs.queues.email-notifications}", deletionPolicy = NEVER)
public void listenNotifications(MessageDto message, Acknowledgment acknowledgment) {
    Mono.just(message)
        .log("NotificationsSQSConsumer.listenNotifications")
        .map(this::toSendEmailCommand)
        .flatMap(sendEmailUseCase::handle)
        .then(sendAck(acknowledgment))
        .log("NotificationsSQSConsumer.sendAck")
        //.block(); //.subscribe() nop**
}

private Mono<?> sendAck(Acknowledgment acknowledgment) {
    return Mono.fromCallable(() -> acknowledgment.acknowledge().get())
        .doOnError(throwable -> log.error("ERROR!:" + throwable.getMessage()))
        .subscribeOn(Schedulers.boundedElastic());
}
    
class SendEmailUseCase {
    
    public Mono<Void> handle(SendEMailCommand sendEMailCommand) {
        return Mono.just(sendEMailCommand.getTemplateId())
                .flatMap(templateLoader::getTemplate) //do a Mono.fromCallable on a bounded elastic
                .map(s -> Tuples.of(new StringReader(s), sendEMailCommand.getTemplateId()))
                .map(this::newMustache)
                .map(c -> Tuples.of(c, sendEMailCommand.getTemplateData()))
                .map(this::build)
                .map(mailText -> Tuples.of(sendEMailCommand.getReceiver(), sendEMailCommand.getSubject(), mailText))
                .map(this::newEmailMessage)
                .flatMap(emailSender::sendMail); //do a Mono.fromCallable on bounded elastic
    }
}

//Json
{
  "dd.span_id":"0",
  "dd.trace_id":"0",
  "timestamp":"2021-03-04T15:53:00.588 03:00",
  "event_type":"logging",
  "level":"ERROR",
  "thread_name":"boundedElastic-1",
  "logger_name":"c.n.c.m.i.m.NotificationsSQSConsumer",
  "message":"ERROR!:com.amazonaws.services.sqs.model.AmazonSQSException: Value AQEB/60/Nzg2oC9tdLlZT4BiztR8UAgd503EZ/bHQ7bE6WuKRfF59Y5l/2+gmqMZtSJs8sug6qUNuUt7pmXVM8G2S3TVt9yVc05t
}

Upvotes: 2

Views: 431

Answers (1)

jorgebo10
jorgebo10

Reputation: 540

I realized that i was missing the error callback on .subscribe()

@Async
    @SqsListener(value = "${sqs.queues.email-notifications}", deletionPolicy = NEVER)
    public void handle(MessageDto message, Acknowledgment acknowledgment) {
        Mono.just(message)
                .log("NotificationsSQSConsumer.listenNotifications")
                .map(this::toSendEmailCommand)
                .flatMap(sendEmailUseCase::handle)
                .then(sendAck(acknowledgment))
                .log("NotificationsSQSConsumer.sendAck")
                .subscribe(o -> {
                }, t -> log.error("ERROR!:" + t.getMessage()));
    }

    private Mono<?> sendAck(Acknowledgment acknowledgment) {
        return Mono.fromCallable(() -> acknowledgment.acknowledge().get())
                .subscribeOn(Schedulers.boundedElastic());
    }

Upvotes: 2

Related Questions