Reputation: 11991
Working with kafka 7.2, when sending a message using a producer I find that once consuming it the message arrives with additional section in the beginning of the message.
For example, when sending to kafka a simple string "King Daniel", which in byte array looks like:
4B 69 6E 67 20 44 61 6E 69 65 6C
But when I consume it for some reason I get:
00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C
Which is the string "........ֲִ.|King Daniel"
So I have an additional 12 chars in the beginning of my message. Is this some sort of header? How can I get my original message?
Here is my consumer code:
public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
long offset = 0;
while (true) {
// create a fetch request for partition 0, current offset, and
// fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
ByteBuffer payload = msg.message().payload();
writer.writeToFile(payload.array());
// advance the offset after consuming each message
offset = msg.offset();
}
}
} catch (Exception e) {
LOG.error("Error occured while consuming from kafka", e);
}
}
So I'm writing the msg.message().payload().array()
to a file, and then when I open this file I can see the original content with the addition of 12 extra chars in the beginning.
How can I get my exact original message?
Upvotes: 1
Views: 1846
Reputation: 6418
The problem is that ByteBuffer.array()
method returns an array that backs this buffer (see http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#array()).
The ByteBuffer might occupy only a part of backing array. Moreover, this method won't work for read-only ByteBuffers and direct ByteBuffers: it will throw ReadOnlyBufferException
if the array is read-only or UnsupportedOperationException
if the ByteBuffer
doesn't have backing array.
You may use following code snippet to read ByteBuffer
contents into an array:
ByteBuffer payload = msg.message().payload();
byte[] contents = new byte[payload.remaining()];
payload.get(contents);
writer.writeToFile(contents);
However, it may worth to extend your writer
to write data directly from ByteBuffer
and avoid extra copy.
Upvotes: 2