Reputation: 344
I am trying to set up a binding to forward Kafka messages from the Spring Integration errorChannel to a custom channel (for centralised error processing).
The error messages are being sent to the configured channel, but they are arriving as a GenericMessage
with a byte[] payload, which consists of exception details and the failed message.
My config:
spring:
cloud:
stream:
kafka:
bindings:
accountOut.producer:
sync: true
binder:
autoCreateTopics: false
headers:
- spanId
- spanTraceId
- spanSampled
- spanParentSpanId
- spanName
- spanFlags
- eventType
- Authorization
bindings:
error:
destination: test-error
accountOut:
producer.partitionKeyExpression: payload.key
content-type: application/json
destination: account
kafka:
producer.keySerializer: org.apache.kafka.common.serialization.StringSerializer
consumer.valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
I'm listening with a @StreamListener(target = "kieran-error")
and the consumer configured with @Input("kieran-error") SubscribableChannel
Reading the docs, I was expecting the message to arrive as and ErrorMessage
. Is there a way I can acheive this? Or to configure the payload to arrive as an Object?
Versions I'm using:
--- Question Update ---
I realise now I could configure Spring Integration to forward to a Kafka Topic by listening to the errorChannel
, e.g.
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
return handler;
}
But is it possible to configure this flow in the properties yaml rather than code? That's where all the other Kafka config is so configuring the kafka template in code is not ideal.
Another option would be to listen for the ErrorMessage explicitly and send to a kafka output channel in the code:
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
outputChannel.kieranError().send(...)
}
Upvotes: 0
Views: 3371
Reputation: 174799
Exactly where are you consuming such a message?
The message you describe sounds like a message sent to the DLQ topic when enableDlq
is true for a consumer; you show no consumer configuration so it's hard for me to guess.
The ErrorMessage
sent to the destination-specific error channel (and bridged to the global errorChannel
can be consumed with
@ServiceActivator(inputChannel = "errorChannel")
public void handle(ErrorMessage em) {
...
}
The
error:
destination:
is legacy and was intended for user code to send messages to the errorChannel
that would go to that topic.
Upvotes: 1