Reputation: 735
I have a stream in Spring Cloud Data Flow that source's from a Kafka topic with an Avro schema (by way of Confluent Schema Registry), does some lightweight processing, then sinks to another topic.
Consuming from the first topic works great, and is verified in the logs. However, when I try to produce to the second topic, it doesn't work. If I set spring.cloud.stream.kafka.binder.producer-properties.auto.register.schemas
to true, then it returns this exception:
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes", failedMessage=GenericMessage [payload=byte[214], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=, kafka_receivedTopic=<my-source-topic>, target-protocol=kafka, kafka_offset=10209, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@180bdb9b, id=fbb4b321-0ece-f5c1-2399-29a1f41cd024, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1630351912519, contentType=application/avro, kafka_groupId=<my-scdf-stream-name>, timestamp=1630352217225}
…
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "bytes"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
If I set spring.cloud.stream.kafka.binder.producer-properties.auto.register.schemas
to false, I get this instead:
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: "bytes"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
If I delete the existing schemas in the schema registry and try again, with auto.register.schemas
set to true instead, then this is what it shows as the schema it's generating for me:
{"subject":"<my-sink-topic>-value","version":12,"id":28,"schema":"\"bytes\""}
NOTE: When I see Error retrieving Avro schema: "bytes"
, it makes me wonder if something is off there, which is why it is generating a subject in Schema registry with just "bytes"
as the schema. Is this indicating what is going wrong here?
Whatever the case is, records now show up in the topic, and are just a single string with no fields, i.e., something like this:
null "R220e2388-60ef-4887-b766-9f11ffa948a7\fcreate\u0010<schema-name>\u0000\u0002\u0012Kalamazoo\u0000\u0000\u0002\u00142021-07-07\u0002\u00142021-07-07\u0002\u0012fake-dataB61138 \u0000\u0002\u00142021-07-07\u0002\n49006"
(null
part is the key, which is somewhat to be expected, given that there's no key). But this is very different from what it should be (note that I'm using the kafka-avro-console-consumer
to consume this topic).
This is what my processor Java code looks like:
@Bean
public Function<GenericRecord, GenericRecord> process() {
return genericRecord -> {
try {
logger.info(genericRecord.toString());
genericRecord.put("val", "fake-data");
} catch (Exception e) {
logger.error("Failed to process record");
e.printStackTrace();
}
return genericRecord;
};
}
}
It seems that something is happening in Spring that makes it so that the schema is not being generated for this GenericRecord properly.
Any ideas on why the write schema generated for these GenericRecords have no fields?
I checked the following guides and posts, but as far as I can tell I'm doing things according to what it says there:
spring.cloud.stream.kafka.binder.producer-properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.producer-properties.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.output.content-type=application/*+avro
https://github.com/Anant/avro-scdf-schema-registry
Upvotes: 1
Views: 1262
Reputation: 735
It turns out that we needed to set useNativeEncoding
as true, as per this example.
For the relevant documentation, see here: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties
Upvotes: 1