WolfiG
WolfiG

Reputation: 1163

Python-processed Avro formatted data sent through a Apache Kafka does not yield same output when dezerialized in Apache Camel/Java processor

I am running a Kafka broker where I push messages to via a Python program. For efficient data exhchange I use Apache Avro format. At the Kafka broker, the message is picked up by a Camel route with processor. In this processor I want to de-serialize the message and finally want to push data to an InfluxDB.

The process mechanics work, but in the Camel route I do not get the data out I put in. On the Python side I create a dictionary:

testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)

The corresponding Avro schema on Python side looks like this:

{
  "namespace": "my.namespace",
  "name": "myRecord",
  "type": "record",
  "fields": [
    {"name": "name",         "type": "string"},
    {"name": "double_one",   "type": "double"},
    {"name": "double_two",   "type": "double"},
    {"name": "double_three", "type": "double"},
    {"name": "time_stamp",   "type": "long"}
  ]
}

The Python code for sending the avro-formatted message to Kafka look like this:

def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
        -> FutureRecordMetadata:
    """
    Method for sending message to kafka broker in the avro binary format
    :param dataDict: data dictionary containing message data
    :param topic_id: the Kafka topic to send message to
    :param schemaDefinition: JSON schema definition
    :return: FurtureRecordMetadata
    """
    schema = avro.schema.parse(schemaDefinition)
    writer = avro.io.DatumWriter(schema)
    bytes_stream = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_stream)
    writer.write(dataDict, encoder)
    raw_bytes = bytes_stream.getvalue()

    messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
    
    result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
    return result

The message arrives as expected at the broker, is picked up by camel and processed by the following JAVA code:

from(kafkaEndpoint) //
                .process(exchange -> {
                    Long kafkaInboundTime = Long
                            .parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
                    if (exchange.getIn().getHeader("kafka.KEY") != null) {

                        BinaryDecoder decoder = DecoderFactory.get()
                                .binaryDecoder(exchange.getIn().getBody(InputStream.class), null);

                        SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);

                        System.out.println(datumReader.read(null, decoder).toString());
                    }
                }) //
                .to(influxdbEndpoint);

With avroSchema currently hard coded in the constructor of my class as follows:

avroSchema = SchemaBuilder.record("myRecord") //
                .namespace("my.namespace") //
                .fields() //
                .requiredString("name") //
                .requiredDouble("double_one") //
                .requiredDouble("double_two") //
                .requiredDouble("double_three") //
                .requiredLong("time_stamp") //  
                .endRecord();

The output of System.out.println is

{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}

Obviously, something goes wrong, but I don't know what. Any help appreciated.

Update 1 As the Python code is running on an Intel/Window machine, Kafka (In a VM) and the Java code on Linux machines with unknown architecture, could this effect be caused by different endian-ness of the systems?

Update 1.1 Endian-ness can be excluded. Checked on both sides, both were 'little'

Update 2 As a check I changed the schema definition to string type for all fields. With this definition, values and keys are transferred correctly - Python input and Java/Camel output are the same.

Update 3 The camel rout producer endpoint to Kafka does not have any special features such as deserializers, etc.:

"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"

Upvotes: 2

Views: 973

Answers (1)

WolfiG
WolfiG

Reputation: 1163

I found a solution to my problem. The following Python code produces the desired output into Kafka:

def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
        -> FutureRecordMetadata:
    """
    Method for sending message to kafka broker in the avro binary format
    :param dataDict: data dictionary containing message data
    :param topic_id: the Kafka topic to send message to
    :param schemaDefinition: JSON schema definition
    :return: None
    """
    schema = avro.schema.parse(schemaDefinition)

    bytes_writer = io.BytesIO()
    encoder = BinaryEncoder(bytes_writer)
    writer = DatumWriter(schema)
    writer.write(dataDict, encoder)
    raw_bytes = bytes_writer.getvalue()

    self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)

    try:
        # NOTE: I use the 'AVRO' key to separate avro formatted messages from others 
        result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
    except Exception as err:
        print(err)
    self._messageBrokerWriterConnection.flush()

Key to the solution was adding the valueDeserializer=... to the end point definition on the Apache Camel side:

import org.apache.kafka.common.serialization.ByteArrayDeserializer;

 ...

TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());

The Apache camel route code including the conversion to InfluxDB point can then be written like this:

@Component
public class Route_TEST_QUEUE extends RouteBuilder {

   Schema avroSchema = null;

   private Route_TEST_QUEUE() {
       avroSchema = SchemaBuilder //
             .record("ElectronCoolerCryoMessage") //       
             .namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
            .fields() //
            .requiredString("name") //
            .requiredDouble("double_one") //
            .requiredDouble("double_two") //
            .requiredDouble("double_three") //
            .requiredLong("time_stamp") // 
            .endRecord();
    }

    private String fromEndpoint = TEST_QUEUE.definitionString();

    @Override
    public void configure() throws Exception {

        from(fromEndpoint) //
                .process(messagePayload -> {        
                    byte[] data = messagePayload.getIn().getBody(byte[].class);
                    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
                    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
                    GenericRecord record = datumReader.read(null, decoder);

                    try {
                        Point.Builder influxPoint = Point
                            .measurement(record.get("name").toString());
                        long acqStamp = 0L;
                        if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
                            acqStamp = (long) record.get("time_stamp");
                        } else {
                            acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
                        }

                        influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);

                        Map<String, Object> fieldMap = new HashMap<>();

                        avroSchema.getFields().stream() //
                                .filter(field ->    !field.name().equals("keyFieldname")) //
                                .forEach(field -> {
                                     Object value = record.get(field.name());
                                    fieldMap.put(field.name().toString(), value);
                                });

                        influxPoint.fields(fieldMap);

                    } catch (Exception e) {
                         MessageLogger.logError(e);
                    }
                }) //
                .to(...InfluxEndpoint...) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .to("stream:out");
        }
    }
}

This works for my purposes - no confluent, only kafka.

Upvotes: 2

Related Questions