Reputation: 540
i need help to understand why the error message is not logged when subscribing to the stream with .subscribe(). However, the error is logged when subscribing with .block(). I understand that it might be to the block() call subscribing on the main thread. I was wondering which changes should i do in order to see the error logged when subscribing with .subscribe(). Also not sure whether it is the best way to deal listen for sqs messages on webflux. Thanks,
@SqsListener(value = "${sqs.queues.email-notifications}", deletionPolicy = NEVER)
public void listenNotifications(MessageDto message, Acknowledgment acknowledgment) {
Mono.just(message)
.log("NotificationsSQSConsumer.listenNotifications")
.map(this::toSendEmailCommand)
.flatMap(sendEmailUseCase::handle)
.then(sendAck(acknowledgment))
.log("NotificationsSQSConsumer.sendAck")
//.block(); //.subscribe() nop**
}
private Mono<?> sendAck(Acknowledgment acknowledgment) {
return Mono.fromCallable(() -> acknowledgment.acknowledge().get())
.doOnError(throwable -> log.error("ERROR!:" + throwable.getMessage()))
.subscribeOn(Schedulers.boundedElastic());
}
class SendEmailUseCase {
public Mono<Void> handle(SendEMailCommand sendEMailCommand) {
return Mono.just(sendEMailCommand.getTemplateId())
.flatMap(templateLoader::getTemplate) //do a Mono.fromCallable on a bounded elastic
.map(s -> Tuples.of(new StringReader(s), sendEMailCommand.getTemplateId()))
.map(this::newMustache)
.map(c -> Tuples.of(c, sendEMailCommand.getTemplateData()))
.map(this::build)
.map(mailText -> Tuples.of(sendEMailCommand.getReceiver(), sendEMailCommand.getSubject(), mailText))
.map(this::newEmailMessage)
.flatMap(emailSender::sendMail); //do a Mono.fromCallable on bounded elastic
}
}
//Json
{
"dd.span_id":"0",
"dd.trace_id":"0",
"timestamp":"2021-03-04T15:53:00.588 03:00",
"event_type":"logging",
"level":"ERROR",
"thread_name":"boundedElastic-1",
"logger_name":"c.n.c.m.i.m.NotificationsSQSConsumer",
"message":"ERROR!:com.amazonaws.services.sqs.model.AmazonSQSException: Value AQEB/60/Nzg2oC9tdLlZT4BiztR8UAgd503EZ/bHQ7bE6WuKRfF59Y5l/2+gmqMZtSJs8sug6qUNuUt7pmXVM8G2S3TVt9yVc05t
}
Upvotes: 2
Views: 431
Reputation: 540
I realized that i was missing the error callback on .subscribe()
@Async
@SqsListener(value = "${sqs.queues.email-notifications}", deletionPolicy = NEVER)
public void handle(MessageDto message, Acknowledgment acknowledgment) {
Mono.just(message)
.log("NotificationsSQSConsumer.listenNotifications")
.map(this::toSendEmailCommand)
.flatMap(sendEmailUseCase::handle)
.then(sendAck(acknowledgment))
.log("NotificationsSQSConsumer.sendAck")
.subscribe(o -> {
}, t -> log.error("ERROR!:" + t.getMessage()));
}
private Mono<?> sendAck(Acknowledgment acknowledgment) {
return Mono.fromCallable(() -> acknowledgment.acknowledge().get())
.subscribeOn(Schedulers.boundedElastic());
}
Upvotes: 2