LuxusIo
LuxusIo

Reputation: 5

Manual ack after message send with Spring Cloud Stream + RabbitMQ (Functional)

I found how to do manual ack with functional way.

However, this does not explain how to do manual ack sending after messages.
If there is a networking error between the ack and the sending of the message, I think the message will be lost.
I don't want to lose message.

Versions

This is my current code

@Controller
class MyFunction(private val myService: MyService) {
    private val objectMapper = ObjectMapper()

    init {
        objectMapper.registerKotlinModule()
    }
    
    @Bean
    fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
        val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
        val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()
        
        val request = objectMapper.readValue(it.payload, MyRequest::class.java)
        
        try {
            val response = myService.process(request)
            
            channel.basicAck(deliveryTag, false)
            // I think it can potentially cause message loss
            MessageBuilder.withPayload(result)
                .build()
        } catch (ex: Exception) {
            channel.basicNack(deliveryTag, false, false)
            throw ex
        }
    }
}

Upvotes: 0

Views: 667

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

Use a Consumer to receive the message and StreamBridge to send (instead of using Function.

But why do you need manual acks anyway? Throwing the exception will cause the container to nack the message (and ack with normal exit) with auto mode.

Upvotes: 1

Related Questions