kuti
kuti

Reputation: 211

jdbc kafka connect avro schema incompatibility with stream

I'm fetching data from oracle db using jdbc source connector and pushing(both key & value in avro format(schema registry)) to a kafka topic.

Value schema:
{
"subject": "testtopicA-value",
"version": 1,
"id": 1122,
"schema": "{\"type\":\"record\",\"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\",\"fields\":[{\"name\":\"ID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"TIME\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"STATUS\",\"type\":\"string\"},{\"name\":\"NUMBER\",\"type\":[\"null\",\"int\"],\"default\":null}]}"
}

Key Schema :
{
"subject": "testtopicA-key",
"version": 1,
"id": 1123,
"schema": "[\"null\",\"long\"]"
}

I've a kafka stream listening to this topic and has avro Genericrecord. When I started the stream, I started getting ClassCastException: java.lang.Long cannot be cast to org.apache.avro.generic.GenericRecord The schema generated by connect has fields that has datatype as "long"

        final StreamsBuilder builder = new StreamsBuilder();
    KStream<?,?> stream;
    stream = builder.stream(INPUTTOPIC);        
    
    ((KStream<GenericRecord,GenericRecord>) stream)
    
    .filter((k,v) -> v != null)
    .map((k,v)->
    {
        .........
        .......
    

Does anyone has suggestions on how to resolve this issue? Confluent version : 6.2.0

Upvotes: 0

Views: 259

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

Based on the error, you've not set the default key/value serde to be the Avro one; you've left it as LongSerde

Upvotes: 0

Related Questions