Reputation: 3427
How can I read data from Kafka in byte[]
format?
I have an implementation that reads events as String
with SimpleStringSchema()
but I couldn't find a schema to read data as byte[]
.
Here is my code:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka1:9092");
properties.setProperty("zookeeper.connect", "zookeeper1:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
properties.setProperty("auto.offset.reset", "earliest");
DataStream<byte[]> stream = env
.addSource(new FlinkKafkaConsumer010<byte[]>("testStr", ? ,properties));
Upvotes: 2
Views: 3831
Reputation: 11
For scala you should write as following
new AbstractDeserializationSchema[Array[Byte]](){
override def deserialize(bytes: Array[Byte]): Array[Byte] = bytes
}
Upvotes: 0
Reputation: 3427
Finally I found that:
DataStream<byte[]> stream = env
.addSource(new FlinkKafkaConsumer010<>("testStr", new AbstractDeserializationSchema<byte[]>() {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
}, properties));
Upvotes: 4