Rupesh More
Rupesh More

Reputation: 45

kafka-connect-hdfs : SequenceFileWriter creates badfiles on connector restarts causing EOFException in the SequenceFileReader

In Kafka connect hdfs, we have the below SequenceFileWriter.java class to write kafka messages in SequenceFileFormat.

import java.io.IOException;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.kafka.connect.sink.SinkRecord;

/**
 * Provider of a Sequence File record writer.
 */
public class SequenceFileWriterProvider implements RecordWriterProvider
{
  public String getExtension() {
    return "";
  }

  @Override
  public RecordWriter<SinkRecord> getRecordWriter(Configuration conf, String fileName, SinkRecord record, AvroData avroData) throws IOException {
    Path path = new Path(fileName);

    final SequenceFile.Writer writer;
    SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
    SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(LongWritable.class);
    SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(Text.class);
    SequenceFile.Writer.Option optCodec = SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new BZip2Codec());
    writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCodec);

    return new RecordWriter<SinkRecord>() {
      @Override
      public void write(SinkRecord record) throws IOException {
        writer.append(
            new LongWritable(System.currentTimeMillis()),
            new Text((byte[]) record.value())
        );
      }

      @Override
      public void close() throws IOException {
        writer.close();
      }
    };
  }
}

We run confluent 5.0.0 within docker container managed by kubernetes. We have observed that when we delete the replication controller in k8s running kafka connector and recreate the replication controller, some of the sequence files get corrupted. We have a spark job which reads this data using SequenceFileReader and receive a below EOFException. Also observed that the there are two extra bytes that appear at the end of file. We guess there is a problem with the SequenceFileWriter and need help in validating the Writer. Any help would be appreciated. Thanks.

java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.apache.hadoop.io.WritableUtils.readVLong(WritableUtils.java:308)
    at org.apache.hadoop.io.WritableUtils.readVInt(WritableUtils.java:329)
    at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2160)
    at org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2227)
    at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2263)
    at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2394)
    at badSequenceFile.readSequenceFile(badSequenceFile.java:27)
    at badSequenceFile.main(badSequenceFile.java:345)

Note: When we delete the connector temporary files(+tmp) before starting the k8s replication controller, connector starts clean and donot create badfiles.

Upvotes: 0

Views: 770

Answers (1)

Rupesh More
Rupesh More

Reputation: 45

Modifying the writer.append to handle exception looks to have fixed the issue of not writing bad sequence files with misplaced end of file (EOF) marker. Additionally also performed type casting of the record value to String datatype from byte.

return new RecordWriter<SinkRecord>() {
  @Override
  public void write(SinkRecord record) {
      if (record != null) {
          byte[] text = (byte[]) record.value();
              try{
                  writer.append(
                          new LongWritable(System.currentTimeMillis()),
                          new Text(new String (text))
                  );

              } catch (Exception e) {
                  logger.error("Exception encounterd : "+e+" for text : "+text);
              }
          }
      }
  }

Upvotes: 0

Related Questions