DLSMatias
DLSMatias

Reputation: 21

How to modify the filename of the S3 object uploaded using the Kafka Connect S3 Connector?

I've been using the S3 connector for a couple of weeks now, and I want to change the way the connector names each file. I am using the HourlyBasedPartition, so the path to each file is already enough for me to find each file, and I want the filenames to be something generic for all the files, like just 'Data.json.gzip' (with the respective path from the partitioner).

For example, I want to go from this:

<prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

To this:

<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>

The objective of this is to only make one call to S3 to download the files later, instead of having to look for the filename first and then download it.

Searching through the files from the folder called 'kafka-connect-s3', I found this file: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java which at the end has some of the following functions:

private RecordWriter getWriter(SinkRecord record, String encodedPartition)
      throws ConnectException {
    if (writers.containsKey(encodedPartition)) {
      return writers.get(encodedPartition);
    }
    String commitFilename = getCommitFilename(encodedPartition);
    log.debug(
        "Creating new writer encodedPartition='{}' filename='{}'",
        encodedPartition,
        commitFilename
    );
    RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
    writers.put(encodedPartition, writer);
    return writer;
  }

  private String getCommitFilename(String encodedPartition) {
    String commitFile;
    if (commitFiles.containsKey(encodedPartition)) {
      commitFile = commitFiles.get(encodedPartition);
    } else {
      long startOffset = startOffsets.get(encodedPartition);
      String prefix = getDirectoryPrefix(encodedPartition);
      commitFile = fileKeyToCommit(prefix, startOffset);
      commitFiles.put(encodedPartition, commitFile);
    }
    return commitFile;
  }

  private String fileKey(String topicsPrefix, String keyPrefix, String name) {
    String suffix = keyPrefix + dirDelim + name;
    return StringUtils.isNotBlank(topicsPrefix)
           ? topicsPrefix + dirDelim + suffix
           : suffix;
  }

  private String fileKeyToCommit(String dirPrefix, long startOffset) {
    String name = tp.topic()
                      + fileDelim
                      + tp.partition()
                      + fileDelim
                      + String.format(zeroPadOffsetFormat, startOffset)
                      + extension;
    return fileKey(topicsDir, dirPrefix, name);
  }

I don't know if this can be customised to what I want to do but seems to be somehow near/related to my intentions. Hope it helps.

(Submitted an issue to Github as well: https://github.com/confluentinc/kafka-connect-storage-cloud/issues/369)

Upvotes: 1

Views: 1443

Answers (0)

Related Questions