Reputation: 31
I am writing a producer to push messages to Kinesis stream using Spring cloud stream libraries.I am able to successfully push the data to kinesis but on the kinesis side it is failing with throughput exceeded exception.Is there a way to retry pushing these messages again and know exactly which message failed ? Also, I don't want to use KPL or KCL.
I tried the solution suggested in the answer and this is my config:
spring.cloud.stream.bindings.input.producer.errorChannelEnabled: true spring.cloud.stream.bindings.input.producer.error.destination: myFooDestination.myGroup.errors
Is this the right way to do, then how to map the "spring.cloud.stream.bindings.input.producer.error.destination: myFooDestination.myGroup.errors" to "error-channel" property in the spring integration config as below?
<int-http:inbound-channel-adapter id="abcErrorChannel"
channel="defChannel"
**error-channel="errorChannel**"
</int-http:inbound-channel-adapter>
Upvotes: 1
Views: 878
Reputation: 55
Customer groups comes in picture only for consumers. On producer side there is no such concept of groups. For binding to error channel on producer side use below to properties and below code to bind to error channel.
spring.cloud.stream.bindings.output.destination=kinesis-stream
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@ServiceActivator(inputChannel = "kinesis-stream.errors")
public void receiveProduceError(Message receiveMsg) {
System.err.println("receive error msg: " + receiveMsg);
}
Upvotes: 0
Reputation: 121560
The PutRecord(s)
request in the AWS Kinesis Binder is fully based on the AmazonKinesisAsync
and the send to the AWS is indeed async. So, we can't use a built-in RetryTemplate
feature there. But at the same time, the error of that async operation is sent to the destination-specific errorChannel
: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producer_properties
errorChannelEnabled
When set to true, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See “[binder-error-channels]” for more information.
Default: false.
The channel name is based on the destination and consumer group, plus errors
suffix: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-overview-error-handling
Also, in the event you are binding to the existing destination such as:
spring.cloud.stream.bindings.input.destination=myFooDestination spring.cloud.stream.bindings.input.group=myGroup
the full destination name is
myFooDestination.myGroup
and then the dedicated error channel name ismyFooDestination.myGroup.errors
.
Upvotes: 1