Will
Will

Reputation: 320

Kafka producer JSON serialization

I'm trying to use Spring Cloud Stream to integrate with Kafka. The message being written is a Java POJO and while it works as expected (the message is being written to the topic and I can read off with a consumer app), there are some unknown characters being added to the start of the message which are causing trouble when trying to integrate Kafka Connect to sink the messages from the topic.

With the default setup this is the message being pushed to Kafka:

     contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}

If I configure the Kafka producer within the Java app then the message is written to the topic without the leading characters / headers:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
        configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);

        return new DefaultKafkaProducerFactory<String, Object>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Message on Kafka:

{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}

Since I'm just setting the key/value serializers I would've expected to be able to do this within the application.yml properties file, rather than doing it through the code. However, when the yml is updated to specify the serializers it's not working as I would expect i.e. it's not generating the same message as the producer configured in Java (above):

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

Message on Kafka:

"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"

Should it be possible to configure this solely through the application yml? Are there additional settings that are missing?

Upvotes: 3

Views: 28197

Answers (3)

Senthil
Senthil

Reputation: 289

Now, spring.kafka.producer.value-serializer property can be used

yml:

spring:
   kafka:
     producer:
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

properties:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Upvotes: 1

Will
Will

Reputation: 320

Credit to @Gary for the answer above!

For completeness, the configuration which is now working for me is below.

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          producer:
            useNativeEncoding: true
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174494

See headerMode and useNativeEncoding in the producer properties (....session.producer.useNativeEncoding).

headerMode

When set to raw, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.

Default: embeddedHeaders.

useNativeEncoding

When set to true, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used the headerMode property is ignored and headers will not be embedded into the message.

Default: false.

Upvotes: 1

Related Questions