upendra
upendra

Reputation: 45

Not able to read Kafka - Avro Schema messages

Any solution to this problem??? I am unable to read KAFKA-AVRO Schema messages. Iam trying to send messages from logstash to KAFKA to HDFS.

The following is the tech stack:

  1. Logstash 2.3 - Current production version
  2. Confluent 3.0.
  3. Plugins: a. Logstash-kafka-Output plugin b. Logstash-codec-avro.
  4. zookeeper: 3.4.6
  5. KAFKA: 0.10.0.0

Logstash config file looks like this:

input {
stdin{}
}

filter {
mutate {
remove_field => ["@timestamp","@version"]
  }
}

output {
  kafka {
topic_id => 'logstash_logs14'

codec => avro  { 
schema_uri => "/opt/logstash/bin/schema.avsc"
    }
  }
}

The schema.avsc file looks like this:

{
    "type":"record",
    "name":"myrecord",
    "fields":[
        {"name":"message","type":"string"},
        {"name":"host","type":"string"}
        ]
}

Following commands were run:

  1. Start Zookeeper in its own terminal

    ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2 Start Kafka in its own terminal

./bin/kafka-server-start ./etc/kafka/server.properties

3 Start schema registry in its own terminal

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4 From logstash directory, run the following command

bin/logstash -f ./bin/logstash.conf

5 Type the log message that you wish to send to kafka after running above command ex: "Hello World"

6 Consume the topic from Kafka

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Please let me know how to solve this problem

Thanks, Upendra

Upvotes: 4

Views: 2552

Answers (2)

belguith chachia
belguith chachia

Reputation: 1

Maybe answering too late, but facing same problem right now.

Logstrash is using default Serializers here, "org.apache.kafka.common.serialization.StringSerializer"

So if you want to read Avro messages from your event bus, you have to Serialize it with KafkaAvroSerializers on logstash output "io.confluent.kafka.serializers.KafkaAvroSerializer"

then from the consumer part use the matching Deserializer. Problem is logstash does not recognize IO.CONFLUENT at ALL, so you have to do some tricky stuff to add it to, as deps and jars

Upvotes: 0

sdeva
sdeva

Reputation: 11

How are you writing/publishing to Kafka? You are seeing the SerializationException because the data was not written using schema-registry (or KafkaAvroSerializer) but while consuming you are using schema-registry, kafka-avro-console-consumer internally uses schema-registry (or KafkaAvroDeserializer) which expects data to be in certain format (specifically <magic byte><schemaId><data>). If you use kafka-avro-console-producer to write avro data then you shouldn't get this exception or you can set KafkaAvroSerializer in your producer properties for key & value serializers and also set schema-registry-url.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

Upvotes: 1

Related Questions