Reputation: 10469
Im using flink-1.0-SNAPSHOT to consume data from kafka. The data is coming in as Snappy compressed byte[] that gets passed to thrift for later use.
When I use flink to retrieve the data it's getting corrupted or mishandled somehow such that it can't be decompressed. Code is derived from this sample and is as follows:
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
@Override public String map(String value) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
});
The isValidCompressedBuffer returns false every time.
The data is known to be good when consumed via other avenues.
What did I miss?
Im posting this as I couldn't find any examples that used RawSchema
.
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));
dataStream.map(new MapFunction<byte[], Object>() {
@Override
public Object map(byte[] bytes) throws Exception {
boolean bvali = Snappy.isValidCompressedBuffer(bytes);
});
return 0;
}
}).print();
env.execute();
}
Upvotes: 1
Views: 1478
Reputation: 1258
Reading byte-messages as String is incorrect. You should read bytes as is and then decompress:
public Object map(byte[] bytes) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
...
Upvotes: 2