Saurabh
Saurabh

Reputation: 31

Retry pushing messages to Kinesis using Spring Cloud Stream Libraries

I am writing a producer to push messages to Kinesis stream using Spring cloud stream libraries.I am able to successfully push the data to kinesis but on the kinesis side it is failing with throughput exceeded exception.Is there a way to retry pushing these messages again and know exactly which message failed ? Also, I don't want to use KPL or KCL.

I tried the solution suggested in the answer and this is my config:

spring.cloud.stream.bindings.input.producer.errorChannelEnabled: true spring.cloud.stream.bindings.input.producer.error.destination: myFooDestination.myGroup.errors

Is this the right way to do, then how to map the "spring.cloud.stream.bindings.input.producer.error.destination: myFooDestination.myGroup.errors" to "error-channel" property in the spring integration config as below?

<int-http:inbound-channel-adapter id="abcErrorChannel"
                                      channel="defChannel"
                                      **error-channel="errorChannel**"
                                  </int-http:inbound-channel-adapter>

Upvotes: 1

Views: 878

Answers (2)

kamakhya mishra
kamakhya mishra

Reputation: 55

Customer groups comes in picture only for consumers. On producer side there is no such concept of groups. For binding to error channel on producer side use below to properties and below code to bind to error channel.

spring.cloud.stream.bindings.output.destination=kinesis-stream
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true

@ServiceActivator(inputChannel = "kinesis-stream.errors")
public void receiveProduceError(Message receiveMsg) {
    System.err.println("receive error msg: " + receiveMsg);
}

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121560

The PutRecord(s) request in the AWS Kinesis Binder is fully based on the AmazonKinesisAsync and the send to the AWS is indeed async. So, we can't use a built-in RetryTemplate feature there. But at the same time, the error of that async operation is sent to the destination-specific errorChannel: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producer_properties

errorChannelEnabled

When set to true, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See “[binder-error-channels]” for more information.

Default: false.

The channel name is based on the destination and consumer group, plus errors suffix: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-overview-error-handling

Also, in the event you are binding to the existing destination such as:

spring.cloud.stream.bindings.input.destination=myFooDestination
spring.cloud.stream.bindings.input.group=myGroup

the full destination name is myFooDestination.myGroup and then the dedicated error channel name is myFooDestination.myGroup.errors.

Upvotes: 1

Related Questions