S. Harwood
S. Harwood

Reputation: 58

Unable to send message with KafkaNull as Value

I am building a Kafka Application Using Log Compaction on a Topic but I am not able to send a Tombstone Value (KafkaNull)

I have tried using the default configuration for a serializer and when that did not work I used the suggested changes from "Publish null/tombstone message with raw headers" To set the application.properties to:

spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer

The code I have to send a message to a stream is

this.stockTopics.compactedStocks().send(MessageBuilder
    .withPayload(KafkaNull.INSTANCE)
    .setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
    .build())

this.stopTopics.compactedStocks() returns a messageStream that I can send messages to.

Every time I try and send that message with a KafkaNull instance as a payload I get the error Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' to outbound message.

I expect the message to simply be sent to the consumer with a null value but obviously it errors.

Upvotes: 3

Views: 2280

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

I opened a GitHub issue for this.

EDIT

Workaround - this works...

@SpringBootApplication
@EnableBinding(Source.class)
public class So54257687Application {

    public static void main(String[] args) {
        SpringApplication.run(So54257687Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> output.send(new GenericMessage<>(KafkaNull.INSTANCE));
    }

    @KafkaListener(id = "foo", topics = "output")
    public void listen(@Payload(required = false) byte[] in) {
        System.out.println(in);
    }

    @Bean
    @StreamMessageConverter
    public MessageConverter kafkaNullConverter() {
        class KafkaNullConverter extends AbstractMessageConverter {

            KafkaNullConverter() {
                super(Collections.emptyList());
            }

            @Override
            protected boolean supports(Class<?> clazz) {
                return KafkaNull.class.equals(clazz);
            }

            @Override
            protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
                return message.getPayload();
            }

            @Override
            protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
                return payload;
            }

        }
        return new KafkaNullConverter();
    }

}

Upvotes: 5

Related Questions