Reputation: 211
I'm fetching data from oracle db using jdbc source connector
and pushing(both key & value in avro format(schema registry)) to a kafka topic.
Value schema:
{
"subject": "testtopicA-value",
"version": 1,
"id": 1122,
"schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"ID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"TIME\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"STATUS\",\"type\":\"string\"},{\"name\":\"NUMBER\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
}
Key Schema :
{
"subject": "testtopicA-key",
"version": 1,
"id": 1123,
"schema": "[\"null\",\"long\"]"
}
I've a kafka stream listening to this topic and has avro Genericrecord. When I started the stream, I started getting ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.GenericRecord
The schema generated by connect has fields that has datatype as "long"
final StreamsBuilder builder = new StreamsBuilder();
KStream<?,?> stream;
stream = builder.stream(INPUTTOPIC);
((KStream<GenericRecord,GenericRecord>) stream)
.filter((k,v) -> v != null)
.map((k,v)->
{
.........
.......
Does anyone has suggestions on how to resolve this issue? Confluent version : 6.2.0
Upvotes: 0
Views: 259
Reputation: 191681
Based on the error, you've not set the default key/value serde to be the Avro one; you've left it as LongSerde
Upvotes: 0