RyanQuey
RyanQuey

Reputation: 735

Spring Cloud Stream is generating Avro Schema in Confluent Schema Registry without any fields

Project Description

I have a stream in Spring Cloud Data Flow that source's from a Kafka topic with an Avro schema (by way of Confluent Schema Registry), does some lightweight processing, then sinks to another topic.

Problem Description

Consuming from the first topic works great, and is verified in the logs. However, when I try to produce to the second topic, it doesn't work. If I set spring.cloud.stream.kafka.binder.producer-properties.auto.register.schemas to true, then it returns this exception:

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes", failedMessage=GenericMessage [payload=byte[214], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=, kafka_receivedTopic=<my-source-topic>, target-protocol=kafka, kafka_offset=10209, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@180bdb9b, id=fbb4b321-0ece-f5c1-2399-29a1f41cd024, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1630351912519, contentType=application/avro, kafka_groupId=<my-scdf-stream-name>, timestamp=1630352217225}
…
    Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409 

If I set spring.cloud.stream.kafka.binder.producer-properties.auto.register.schemas to false, I get this instead:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: "bytes"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

If I delete the existing schemas in the schema registry and try again, with auto.register.schemas set to true instead, then this is what it shows as the schema it's generating for me:

{"subject":"<my-sink-topic>-value","version":12,"id":28,"schema":"\"bytes\""}

NOTE: When I see Error retrieving Avro schema: "bytes", it makes me wonder if something is off there, which is why it is generating a subject in Schema registry with just "bytes" as the schema. Is this indicating what is going wrong here?

Whatever the case is, records now show up in the topic, and are just a single string with no fields, i.e., something like this:

null    "R220e2388-60ef-4887-b766-9f11ffa948a7\fcreate\u0010<schema-name>\u0000\u0002\u0012Kalamazoo\u0000\u0000\u0002\u00142021-07-07\u0002\u00142021-07-07\u0002\u0012fake-dataB61138 \u0000\u0002\u00142021-07-07\u0002\n49006"

(null part is the key, which is somewhat to be expected, given that there's no key). But this is very different from what it should be (note that I'm using the kafka-avro-console-consumer to consume this topic).

This is what my processor Java code looks like:

  @Bean
  public Function<GenericRecord, GenericRecord> process() {
    return genericRecord -> {
      try {
        logger.info(genericRecord.toString());
        genericRecord.put("val", "fake-data");


      } catch (Exception e) {
        logger.error("Failed to process record");
        e.printStackTrace();
      }

      return genericRecord;
    };
  }
}

It seems that something is happening in Spring that makes it so that the schema is not being generated for this GenericRecord properly.

Any ideas on why the write schema generated for these GenericRecords have no fields?

Relevant Resources

I checked the following guides and posts, but as far as I can tell I'm doing things according to what it says there:

Some of the relevant configs:

spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.output.content-type=application/*+avro

Sample code for reproducing the issue:

https://github.com/Anant/avro-scdf-schema-registry

Upvotes: 1

Views: 1262

Answers (1)

RyanQuey
RyanQuey

Reputation: 735

It turns out that we needed to set useNativeEncoding as true, as per this example.

For the relevant documentation, see here: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties

Upvotes: 1

Related Questions