Reputation: 904
I've developed a couple of C++ apps that produce and consume Kafka messages (using cppkafka) embedding Protobuf3 messages. Both work fine. The producer's relevant code is:
std::string kafkaString;
cppkafka::MessageBuilder *builder;
...
solidList->SerializeToString(&kafkaString);
builder->payload(kafkaString);
Protobuf objects are serialized to string and inserted as Kafka payload. Everything works fine up to this point. Now, I'm trying to develop a consumer for that in Java. The relevant code should be:
KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);
....
ConsumerRecords<Long, String> records = consumer.poll(100);
for (ConsumerRecord<Long, String> record : records) {
SolidList solidList = SolidList.parseFrom(record.value());
...
but that fails at compile time: parseFrom complains: The method parseFrom(ByteBuffer) in the type Solidlist.SolidList is not applicable for the arguments (String). So, I try using a ByteBuffer:
KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);
....
ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);
for (ConsumerRecord<Long, ByteBuffer> record : records) {
SolidList solidList = SolidList.parseFrom(record.value());
...
Now, the error is on execution time, still on parseFrom(): Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.nio.ByteBuffer. I know it is a java.lang.String!!! So, I get back to the original, and try using it as a byte array:
SolidList solidList = SolidList.parseFrom(record.value().getBytes());
Now, the error is on execution time: Exception in thread "main" com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException: Protocol message tag had invalid wire type..
The protobuf documentation states for the C++ serialization: bool SerializeToString(string output) const;: serializes the message and stores the bytes in the given string. Note that the bytes are binary, not text; we only use the string class as a convenient container.*
TL;DR: In consequence, how should I interpret the protobuf C++ "binary bytes" in Java?
This seems related (it is the opposite) but doesn't help: Protobuf Java To C++ Serialization [Binary]
Thanks in advance.
Upvotes: 4
Views: 1764
Reputation: 19187
Try implementing a Deserializer and pass it to KafkaConsumer constructor as value deserializer. It could look like this:
class SolidListDeserializer implements Deserializer<SolidList> {
public SolidList deserialize(final String topic, byte[] data) {
return SolidList.parseFrom(data);
}
...
}
...
KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
Upvotes: 1
Reputation: 21
You can read kafka as ConsumerRecords<Long, String>
. And then SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));
Upvotes: 1