Reputation: 1526
I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/
docker-compose up
works fine, creating topics via shell works fine.
Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE
When starting up the Spring application it prints the correct version of Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
I try to send a message like this
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
Sending on client side fails with
UnknownServerException: The server experienced an unexpected error when processing the request
In the server console I get the message Magic v1 does not support record headers
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0
is in the classpath).
Any clues? Thanks!
Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:
@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String
@Bean
fun producerConfigs(): Map<String, Any> =
HashMap<String, Any>().apply {
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
}
@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
DefaultKafkaProducerFactory(producerConfigs())
@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
KafkaTemplate(producerFactory())
Upvotes: 12
Views: 20920
Reputation: 1
I found my problem, i noted that when i was using the sendDefault(object) method i wasn't getting this error but only when using the send(topic, object) method.
I did a little diggind and i noted that the framework generates a ProducerRecord when you use the sendDefault and not when using the send, which creates a "clean" record with no headers.
So my solution was to create a new ProducerRecord before calling the send method, hope this helps someone.
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, serializedMessage);
kafkaProducerClient.send(record);
Note: I'm using a ByteSerializer, so the "byte[]" part maybe will need to be different in your code.
Upvotes: 0
Reputation: 2571
TL;DR: Try adding this property:
# Disable addition of B3 headers in kafka producer records
spring.sleuth.messaging.enabled=false
Full answer:
I tried addingg props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
, but it was still not working. On further debugging inside the send method I discovered that a B3
header was part of the final record being sent. B3 headers are used for tracing purposes, and indeed I had sleuth as part of the project.
To remove this header, I tried disabling kafka tracing related AutoConfiguration
classes, but that didn't work. I also tried setting the following properties, neither of which worked:
# These didn't work
#spring.sleuth.kafka.enabled=false
#spring.sleuth.messaging.kafka.streams.enabled=false
After some further debugging + trial & errors, I discovered that the produce was being wrapped via a bean post processor, roughly this flow:
org.springframework.cloud.sleuth.brave.instrument.messaging.KafkaFactoryBeanPostProcessor::postProcessAfterInitialization
-> org.springframework.cloud.sleuth.brave.instrument.messaging.TraceProducerPostProcessor::wrapInTracing
-> brave.kafka.clients.KafkaTracing::producerInjector
-> brave.kafka.clients.KafkaProducerRequest::SETTER::put
I still couldn't figure out how to disable KafkaFactoryBeanPostProcessor
, but I saw an annotation defined in the same package: ConditionalOnMessagingEnabled
, which depended on a the following property, setting which finally worked!
# Disable addition of B3 headers in kafka producer records
spring.sleuth.messaging.enabled=false
Upvotes: 0
Reputation: 1
you are using kafka version <=0.10.x.x once you using using this, you must set JsonSerializer.ADD_TYPE_INFO_HEADERS to false as below.
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
for your producer factory properties.
In case you are using kafka version > 0.10.x.x, it should just work fine
Upvotes: 0
Reputation: 3955
I had a similar issue. Kafka adds headers by default if we use JsonSerializer
or JsonSerde
for values.
In order to prevent this issue, we need to disable adding info headers.
if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS
):
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
but if you need a custom JsonSerializer
with specific ObjectMapper
(like with PropertyNamingStrategy.SNAKE_CASE
), you should disable adding info headers explicitly on JsonSerializer
, as spring kafka ignores DefaultKafkaProducerFactory
's property ADD_TYPE_INFO_HEADERS
(as for me it's a bad design of spring kafka)
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
or if we use JsonSerde
, then:
Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
Upvotes: 23
Reputation: 1526
Solved. The problem is neither the broker, some docker cache nor the Spring app.
The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...
It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
A "new" consumer with --bootstrap-server
option should be used (especially when using Kafka 1.0 with JsonSerializer).
Note: Using an old consumer here can indeed affect the producer.
Upvotes: 2
Reputation: 174719
I just ran a test against that docker image with no problems...
$docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f093b3f2475c kafkadocker_kafka "start-kafka.sh" 33 minutes ago Up 2 minutes 0.0.0.0:32768->9092/tcp kafkadocker_kafka_1
319365849e48 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 33 minutes ago Up 2 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafkadocker_zookeeper_1
.
@SpringBootApplication
public class So47953901Application {
public static void main(String[] args) {
SpringApplication.run(So47953901Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> template.send("foo", "bar", "baz");
}
@KafkaListener(id = "foo", topics = "foo")
public void listen(String in) {
System.out.println(in);
}
}
.
spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
.
2017-12-23 13:27:27.990 INFO 21305 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0]
baz
EDIT
Still works for me...
spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
.
2017-12-23 15:27:59.997 INFO 44079 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
...
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
...
2017-12-23 15:28:00.071 INFO 44079 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0]
baz
Upvotes: 1