Reputation: 45
In Kafka connect hdfs, we have the below SequenceFileWriter.java class to write kafka messages in SequenceFileFormat.
import java.io.IOException;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;
/**
* Provider of a Sequence File record writer.
*/
public class SequenceFileWriterProvider implements RecordWriterProvider
{
public String getExtension() {
return "";
}
@Override
public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
Path path = new Path(fileName);
final SequenceFile.Writer writer;
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);
return new RecordWriter<SinkRecord>() {
@Override
public void write(SinkRecord record) throws IOException {
writer.append(
new LongWritable(System.currentTimeMillis()),
new Text((byte[]) record.value())
);
}
@Override
public void close() throws IOException {
writer.close();
}
};
}
}
We run confluent 5.0.0 within docker container managed by kubernetes. We have observed that when we delete the replication controller in k8s running kafka connector and recreate the replication controller, some of the sequence files get corrupted. We have a spark job which reads this data using SequenceFileReader and receive a below EOFException. Also observed that the there are two extra bytes that appear at the end of file. We guess there is a problem with the SequenceFileWriter and need help in validating the Writer. Any help would be appreciated. Thanks.
java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
at badSequenceFile.main(badSequenceFile.java:345)
Note: When we delete the connector temporary files(+tmp) before starting the k8s replication controller, connector starts clean and donot create badfiles.
Upvotes: 0
Views: 770
Reputation: 45
Modifying the writer.append
to handle exception looks to have fixed the issue of not writing bad sequence files with misplaced end of file (EOF) marker.
Additionally also performed type casting of the record value to String datatype from byte.
return new RecordWriter<SinkRecord>() {
@Override
public void write(SinkRecord record) {
if (record != null) {
byte[] text = (byte[]) record.value();
try{
writer.append(
new LongWritable(System.currentTimeMillis()),
new Text(new String (text))
);
} catch (Exception e) {
logger.error("Exception encounterd : "+e+" for text : "+text);
}
}
}
}
Upvotes: 0