Reputation: 1163
I am running a Kafka broker where I push messages to via a Python program. For efficient data exhchange I use Apache Avro format. At the Kafka broker, the message is picked up by a Camel route with processor. In this processor I want to de-serialize the message and finally want to push data to an InfluxDB.
The process mechanics work, but in the Camel route I do not get the data out I put in. On the Python side I create a dictionary:
testDict = dict()
testDict['name'] = 'avroTest'
testDict['double_one'] = 1.2345
testDict['double_two'] = 1.23
testDict['double_three'] = 2.345
testDict['time_stamp'] = long(time.time() * 1000000000)
The corresponding Avro schema on Python side looks like this:
{
"namespace": "my.namespace",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "double_one", "type": "double"},
{"name": "double_two", "type": "double"},
{"name": "double_three", "type": "double"},
{"name": "time_stamp", "type": "long"}
]
}
The Python code for sending the avro-formatted message to Kafka look like this:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: str, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: FurtureRecordMetadata
"""
schema = avro.schema.parse(schemaDefinition)
writer = avro.io.DatumWriter(schema)
bytes_stream = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_stream)
writer.write(dataDict, encoder)
raw_bytes = bytes_stream.getvalue()
messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=<connectionUrl>, client_id='testLogger')
result = messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO_FORMAT'.encode('UTF-8'))
return result
The message arrives as expected at the broker, is picked up by camel and processed by the following JAVA code:
from(kafkaEndpoint) //
.process(exchange -> {
Long kafkaInboundTime = Long
.parseLong(exchange.getIn().getHeader("kafka.TIMESTAMP").toString());
if (exchange.getIn().getHeader("kafka.KEY") != null) {
BinaryDecoder decoder = DecoderFactory.get()
.binaryDecoder(exchange.getIn().getBody(InputStream.class), null);
SpecificDatumReader<Record> datumReader = new SpecificDatumReader<>(avroSchema);
System.out.println(datumReader.read(null, decoder).toString());
}
}) //
.to(influxdbEndpoint);
With avroSchema
currently hard coded in the constructor of my class as follows:
avroSchema = SchemaBuilder.record("myRecord") //
.namespace("my.namespace") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
The output of System.out.println
is
{"name": "avroTest", "double_one": 6.803527358993313E-220, "double_two": -0.9919128115125185, "double_three": -0.9775074719163893, "time_stamp": 20}
Obviously, something goes wrong, but I don't know what. Any help appreciated.
Update 1 As the Python code is running on an Intel/Window machine, Kafka (In a VM) and the Java code on Linux machines with unknown architecture, could this effect be caused by different endian-ness of the systems?
Update 1.1 Endian-ness can be excluded. Checked on both sides, both were 'little'
Update 2 As a check I changed the schema definition to string type for all fields. With this definition, values and keys are transferred correctly - Python input and Java/Camel output are the same.
Update 3 The camel rout producer endpoint to Kafka does not have any special features such as deserializers, etc.:
"kafka:myTopicName?brokers=host:9092&clientId=myClientID&autoOffsetReset=earliest"
Upvotes: 2
Views: 973
Reputation: 1163
I found a solution to my problem. The following Python code produces the desired output into Kafka:
def sendAvroFormattedMessage(self, dataDict: dict, topic_id: MessageBrokerQueue, schemaDefinition: str) \
-> FutureRecordMetadata:
"""
Method for sending message to kafka broker in the avro binary format
:param dataDict: data dictionary containing message data
:param topic_id: the Kafka topic to send message to
:param schemaDefinition: JSON schema definition
:return: None
"""
schema = avro.schema.parse(schemaDefinition)
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer = DatumWriter(schema)
writer.write(dataDict, encoder)
raw_bytes = bytes_writer.getvalue()
self._messageBrokerWriterConnection = KafkaProducer(bootstrap_servers=self._connectionUrl)
try:
# NOTE: I use the 'AVRO' key to separate avro formatted messages from others
result = self._messageBrokerWriterConnection.send(topic=topic_id, value=raw_bytes, key='AVRO'.encode('UTF-8'))
except Exception as err:
print(err)
self._messageBrokerWriterConnection.flush()
Key to the solution was adding the valueDeserializer=...
to the end point definition on the Apache Camel side:
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
...
TEST_QUEUE("kafka:topic_id?brokers=host:port&clientId=whatever&valueDeserializer=" + ByteArrayDeserializer.class.getName());
The Apache camel route code including the conversion to InfluxDB point can then be written like this:
@Component
public class Route_TEST_QUEUE extends RouteBuilder {
Schema avroSchema = null;
private Route_TEST_QUEUE() {
avroSchema = SchemaBuilder //
.record("ElectronCoolerCryoMessage") //
.namespace("de.gsi.fcc.applications.data.loggers.avro.messages") //
.fields() //
.requiredString("name") //
.requiredDouble("double_one") //
.requiredDouble("double_two") //
.requiredDouble("double_three") //
.requiredLong("time_stamp") //
.endRecord();
}
private String fromEndpoint = TEST_QUEUE.definitionString();
@Override
public void configure() throws Exception {
from(fromEndpoint) //
.process(messagePayload -> {
byte[] data = messagePayload.getIn().getBody(byte[].class);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroSchema);
GenericRecord record = datumReader.read(null, decoder);
try {
Point.Builder influxPoint = Point
.measurement(record.get("name").toString());
long acqStamp = 0L;
if (record.hasField("time_stamp") && (long) record.get("time_stamp") > 0L) {
acqStamp = (long) record.get("time_stamp");
} else {
acqStamp = Long.parseLong(messagePayload.getIn().getHeader("kafka.TIMESTAMP").toString());
}
influxPoint.time(acqStamp, TimeUnit.NANOSECONDS);
Map<String, Object> fieldMap = new HashMap<>();
avroSchema.getFields().stream() //
.filter(field -> !field.name().equals("keyFieldname")) //
.forEach(field -> {
Object value = record.get(field.name());
fieldMap.put(field.name().toString(), value);
});
influxPoint.fields(fieldMap);
} catch (Exception e) {
MessageLogger.logError(e);
}
}) //
.to(...InfluxEndpoint...) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.to("stream:out");
}
}
}
This works for my purposes - no confluent, only kafka.
Upvotes: 2