Reputation: 108
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
@Component
public class QueueConsumer {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);
/** The processor. */
@Autowired
private IMessageProcessor processor;
/**
* Consume.
*
* @param message the message
*/
@StreamListener(value = OrderEventSink.ORDER_EVENT)
public void consume(Message<String> message) {
try {
processor.process(message);
} catch (MessageProcessingFailedException e) {
LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
throw e;
}
}
}
spring.cloud.stream.bindings.orderEvent.destination=orderEvents
spring.cloud.stream.bindings.orderEvent.content-
type=application/json
spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
spring.cloud.stream.bindings.orderEvent.consumer.back-off-
multiplier=5
spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial-
interval=60000
spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
spring.cloud.stream.bindings.kafka.binder.brokers=localhost
spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
enableDlq=true
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqName=dead-queue
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqProducerProperties.configuration.key.
serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqProducerProperties.configuration.value.
serializer=org.apache.kafka.common.serialization.StringSerializer
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'scm-orderEvents.scm-orderEvents-consumer.errors'; nested exception is java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:207) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:191) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$200(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162] Caused by: java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:380) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] ... 16 common frames omitted Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_162] at java.lang.String.(String.java:425) ~[na:1.8.0_162] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders(EmbeddedHeaderUtils.java:154) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:107) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:368) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] ... 20 common frames omitted
Upvotes: 0
Views: 1928
Reputation: 174494
This is a bug in the 1.3.2.RELEASE of the kafka binder; it is fixed on master (1.3.3.BUILD-SNAPSHOT).
BTW, the best solution is to use Spring Boot 2.0.1 and SCSt Emlhurst.RELEASE (pulled in by cloud FINCHLEY - currently at M9 milestone).
These versions have native support for Kafka 1.0.
You might also have some success moving to the kafka11 binder artifact (1.3.0) which is compatible with SCSt 1.3.x, as discussed on the Wiki.
Upvotes: 1