DerM
DerM

Reputation: 1526

Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)

I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/

docker-compose up works fine, creating topics via shell works fine.

Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE

When starting up the Spring application it prints the correct version of Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

I try to send a message like this

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Sending on client side fails with

UnknownServerException: The server experienced an unexpected error when processing the request

In the server console I get the message Magic v1 does not support record headers

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0 is in the classpath).

Any clues? Thanks!

Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())

Upvotes: 12

Views: 20920

Answers (6)

Playtodie
Playtodie

Reputation: 1

I found my problem, i noted that when i was using the sendDefault(object) method i wasn't getting this error but only when using the send(topic, object) method.

I did a little diggind and i noted that the framework generates a ProducerRecord when you use the sendDefault and not when using the send, which creates a "clean" record with no headers.

So my solution was to create a new ProducerRecord before calling the send method, hope this helps someone.

ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, serializedMessage);
kafkaProducerClient.send(record);

Note: I'm using a ByteSerializer, so the "byte[]" part maybe will need to be different in your code.

Upvotes: 0

aksh1618
aksh1618

Reputation: 2571

TL;DR: Try adding this property:

# Disable addition of B3 headers in kafka producer records
spring.sleuth.messaging.enabled=false

Full answer: I tried addingg props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);, but it was still not working. On further debugging inside the send method I discovered that a B3 header was part of the final record being sent. B3 headers are used for tracing purposes, and indeed I had sleuth as part of the project.

To remove this header, I tried disabling kafka tracing related AutoConfiguration classes, but that didn't work. I also tried setting the following properties, neither of which worked:

# These didn't work
#spring.sleuth.kafka.enabled=false
#spring.sleuth.messaging.kafka.streams.enabled=false

After some further debugging + trial & errors, I discovered that the produce was being wrapped via a bean post processor, roughly this flow:

org.springframework.cloud.sleuth.brave.instrument.messaging.KafkaFactoryBeanPostProcessor::postProcessAfterInitialization
-> org.springframework.cloud.sleuth.brave.instrument.messaging.TraceProducerPostProcessor::wrapInTracing
  -> brave.kafka.clients.KafkaTracing::producerInjector
    -> brave.kafka.clients.KafkaProducerRequest::SETTER::put

I still couldn't figure out how to disable KafkaFactoryBeanPostProcessor, but I saw an annotation defined in the same package: ConditionalOnMessagingEnabled, which depended on a the following property, setting which finally worked!

# Disable addition of B3 headers in kafka producer records
spring.sleuth.messaging.enabled=false

Upvotes: 0

Kshitiz Agarwal
Kshitiz Agarwal

Reputation: 1

you are using kafka version <=0.10.x.x once you using using this, you must set JsonSerializer.ADD_TYPE_INFO_HEADERS to false as below.

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

for your producer factory properties.

In case you are using kafka version > 0.10.x.x, it should just work fine

Upvotes: 0

Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

I had a similar issue. Kafka adds headers by default if we use JsonSerializer or JsonSerde for values. In order to prevent this issue, we need to disable adding info headers.

if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

but if you need a custom JsonSerializer with specific ObjectMapper (like with PropertyNamingStrategy.SNAKE_CASE), you should disable adding info headers explicitly on JsonSerializer, as spring kafka ignores DefaultKafkaProducerFactory's property ADD_TYPE_INFO_HEADERS (as for me it's a bad design of spring kafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

or if we use JsonSerde, then:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);

Upvotes: 23

DerM
DerM

Reputation: 1526

Solved. The problem is neither the broker, some docker cache nor the Spring app.

The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...

It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

A "new" consumer with --bootstrap-server option should be used (especially when using Kafka 1.0 with JsonSerializer). Note: Using an old consumer here can indeed affect the producer.

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174719

I just ran a test against that docker image with no problems...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

EDIT

Still works for me...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

Upvotes: 1

Related Questions