MD10172016
MD10172016

Reputation: 43

Spring Kafka - Encountering "Magic v0 does not support record headers" error

I'm running a Spring Boot application and have buckled in compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')

I'm trying to go against a Cloudera installation with this version:

Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50

My KafkaProducerConfig class is pretty straightforward:

@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;

@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;

@Value("${spring.kafka.producer.client-id}")
private String producerClientId;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

    return props;
}

@Bean
public ProducerFactory<String, Pdid> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
    KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());

    kafkaTemplate.setDefaultTopic(this.defaultTopicName);

    return kafkaTemplate;
}

@PostConstruct
public void postConstruct() {
    LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
    LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}

}

When I start up the application, I receive:

2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1 2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e

Then, I send in a payload. It shows up in Kafka Tool against the topic. But, in the logs on the Kafka side when the data is trying to be ingested, I receive:

[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)

I've tried the following things from the Producer application side:

  1. Downgrading to Spring Kafka 2.0.4. I was hoping that coming down to the Kafka version of 0.11.0 would help get rid of the problem, but it had no effect.
  2. Verified that the nodes are all the same version. According to my admin, they are.
  3. Verified with my admin that we don't have a mixed installation. Again, I was told that we do not.
  4. Based on a similar Stack Overflow question, I came back up to 2.1.5 and attempted to put in JsonSerializer.ADD_TYPE_INFO_HEADERS to false. I thought perhaps it would remove the headers that the log was referring to. Again, no go and the same error was logged.

I'm hoping I'm missing something obvious. Are there any additional headers I need to turn on/off to help resolve the Magic v0 issue that anyone is aware of?

We have other applications which write to other topics within the same environment successfully, but they are older applications that are hand-crafting the necessary Spring beans. Additionally, those applications are also using a much older client (0.8.2.2) and they are using a StringSerializer for the Producer value instead of JSON. I need my data to be in JSON and I really don't want to downgrade to 0.8.2.2 when we are on a system that should support 0.11.x.

Upvotes: 3

Views: 4252

Answers (3)

bzani
bzani

Reputation: 527

I've upgraded my application to Spring boot 2x and I had some compatibility issues with kafka client dependencies (see Spring-boot and Spring-Kafka compatibility matrix), so I had to upgrade it too. On the other hand, I had an older broker (kafka 0.10) running on server, and then I was unable to send messages to it. I realized also that, even setting the JsonSerializer.ADD_TYPE_INFO_HEADERS to false, the kafka producer was setting headers internally, and since the magic is fixed depending on kafka's version (in the RecordBatch), there was no way in this case not to fall into the condition on MemoryRecordsBuilder.appendWithOffset: if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");. Finally, the only way for me to solve this was upgrading my kafka server.

Upvotes: 0

MD10172016
MD10172016

Reputation: 43

The solution to the problem was a blend of two things:

  1. I needed to add the JsonSerializer.ADD_TYPE_INFO_HEADERS to false, just as Gary Russell suggested.
  2. I needed to flush all the records that had been put into the topic before that configuration had been put into my application. The prior records had the headers, which was corrupting the Flume consumer.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174729

but they are older applications that are hand-crafting the necessary Spring beans.

at org.apache.kafka.common.record.FileRecords. downConvert (FileRecords.java:245)

I am not familiar with kafka broker internals, but it "sounds" like the topics were created with an old broker and their formats don't support headers, rather than the broker version itself (hint: downConvert).

Have you tried this with a clean broker?

The 1.0.x client can talk to older brokers (back to 0.10.2.x IIRC) as long as you don't try to use features that are not supported by the broker. The fact that your broker is 0.11 (which does support headers) is a further indication that it's the topic record format that's the issue.

I have successfully tested up/down broker/client/topic versions without problems, as long as you use the common feature subset.

JsonSerializer.ADD_TYPE_INFO_HEADERS to false.

That should prevent the framework from setting any headers; you need to show your producer code (and all the configuration).

You could also add a ProducerInterceptor to the producer config and examine the ProducerRecord headers property in the onSend() method, to figure out what's setting headers in the output message.

If you are using spring-messaging messages (template.setn(Message<?> m), headers will be mapped by default). Using raw template.send() methods will not set any headers (unless you send a ProducerRecord with headers populated.

Upvotes: 2

Related Questions