forhas
forhas

Reputation: 11991

Kafka adding prefix to message

Working with kafka 7.2, when sending a message using a producer I find that once consuming it the message arrives with additional section in the beginning of the message.

For example, when sending to kafka a simple string "King Daniel", which in byte array looks like:

4B 69 6E 67 20 44 61 6E 69 65 6C

But when I consume it for some reason I get:

00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C

Which is the string "........ֲִ.|King Daniel"

So I have an additional 12 chars in the beginning of my message. Is this some sort of header? How can I get my original message?

Here is my consumer code:

public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
    long offset = 0;
    while (true) {
    // create a fetch request for partition 0, current offset, and
    // fetch size of 1MB
    FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
    ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

    for (MessageAndOffset msg : messages) {
        ByteBuffer payload = msg.message().payload();
        writer.writeToFile(payload.array());
        // advance the offset after consuming each message
        offset = msg.offset();
    }
    }
} catch (Exception e) {
    LOG.error("Error occured while consuming from kafka", e);
}
}

So I'm writing the msg.message().payload().array() to a file, and then when I open this file I can see the original content with the addition of 12 extra chars in the beginning.

How can I get my exact original message?

Upvotes: 1

Views: 1846

Answers (1)

Wildfire
Wildfire

Reputation: 6418

The problem is that ByteBuffer.array() method returns an array that backs this buffer (see http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#array()).

The ByteBuffer might occupy only a part of backing array. Moreover, this method won't work for read-only ByteBuffers and direct ByteBuffers: it will throw ReadOnlyBufferException if the array is read-only or UnsupportedOperationException if the ByteBuffer doesn't have backing array.

You may use following code snippet to read ByteBuffer contents into an array:

ByteBuffer payload = msg.message().payload();
byte[] contents = new byte[payload.remaining()];
payload.get(contents);
writer.writeToFile(contents);

However, it may worth to extend your writer to write data directly from ByteBuffer and avoid extra copy.

Upvotes: 2

Related Questions