Reputation: 79
I am Getting Avro Response from a Kafka Topic from Confluent and i am facing issues when i want to deseralize the response. Not Understanding the Syntax How i should define the Avro deserializer and use in my Kafka Source while reading.
Sharing the approach i am currently doing.
I have a topic In Confluent named employee which is producing message every 10 seconds and each message is seralized by avro schema registry in the Confluent.
I am trying to Read those messages in my scala program I was able to print the serialised messages in the code but not able to deserialize the messaged.
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import java.time.Duration
case class emp(
name: String,
age: Int,
)
object Main {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val schemaRegistryUrl = "http://localhost:8081"
val source = KafkaSource.builder[String].
setBootstrapServers("localhost:9092")
.setTopics("employee")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
val streamEnv : DataStream[String] =
env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "Kafka Source")
streamEnv.print()
env.execute("Example")
}
}
I tried the Approach of Defining the Avro deserializer in kafka source while reading
.setValueOnlyDeserializer(new AvroDeserializationSchema[emp](classOf[emp])
Had no luck in the above approach as well.
Upvotes: 0
Views: 1421
Reputation: 43612
Rather than a AvroDeserializationSchema
, you need to use a ConfluentRegistryAvroDeserializationSchema
instead. The standard Avro deserializer doesn't understand what to do with the magic byte that the Confluent serializer includes.
Upvotes: 1