Yuriy Bondaruk
Yuriy Bondaruk

Reputation: 4750

EMR with multiple encryption key providers

I'm running EMR cluster with enabled s3 client-side encryption using custom key provider. But now I need to write data to multiple s3 destinations using different encryption schemas:

  1. CSE custom key provider
  2. CSE-KMS

Is it possible to configure EMR to use both encryption types by defining some kind of mapping between s3 bucket and encryption type?

Alternatively since I use spark structured streaming to process and write data to s3 I'm wondering if it's possible to disable encryption on EMRFS but then enable CSE for each stream separately?

Upvotes: 2

Views: 3770

Answers (3)

Dmitry S.
Dmitry S.

Reputation: 113

When you use EMRFS, you can specify per-bucket configs in the format:

fs.s3.bucket.<bucket name>.<some.configuration>

So, for example, to turn off CSE except for a bucket s3://foobar, you can set:

   "Classification": "emrfs-site",
   "Properties": {
      "fs.s3.cse.enabled": "false",
      "fs.s3.bucket.foobar.cse.enabled": "true",
      [your other configs as usual]
   }

Please note that it must be fs.s3 and not fs.{arbitrary-scheme} like fs.s3n.

Upvotes: 2

Yuriy Bondaruk
Yuriy Bondaruk

Reputation: 4750

The idea is to support any file systems scheme and configure it individually. For example:

# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider

#no encryption
fs.s3u.cse.enabled = false

#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id

And then to use it in spark like this:

StreamingQuery writeStream = session
        .readStream()
        .schema(RecordSchema.fromClass(TestRecord.class))
        .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
        .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
        .csv(“s3x://aws-s3-bucket/input”)
        .as(Encoders.bean(TestRecord.class))
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("parquet")
        .option("path", “s3k://aws-s3-bucket/output”)
        .option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
        .start();

Ta handle this I’ve implemented a custom Hadoop file system (extends org.apache.hadoop.fs.FileSystem) that delegates calls to real file system but with modified configurations.

// Create delegate FS
this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));

Configuration that passes to delegating file system should take all original settings and replace any occurrences of fs.s3*. with fs.s3n..

private Configuration substituteS3Config(final Configuration conf) {
    if (conf == null) return null;

    final String fsSchemaPrefix = "fs." + getScheme() + ".";
    final String fsS3SchemaPrefix = "fs.s3.";
    final String fsSchemaImpl = "fs." + getScheme() + ".impl";
    Configuration substitutedConfig = new Configuration(conf);
    for (Map.Entry<String, String> configEntry : conf) {
        String propName = configEntry.getKey();
        if (!fsSchemaImpl.equals(propName)
            && propName.startsWith(fsSchemaPrefix)) {
            final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
            LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
            substitutedConfig.set(newPropName, configEntry.getValue());
        }
    }

    return substitutedConfig;
}

Besides that make sure that delegating fs receives uris and paths with supporting scheme and returns paths with custom scheme

@Override
public FileStatus getFileStatus(final Path f) throws IOException {
    FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
    if (status != null) {
        status.setPath(customS3Path(status.getPath()));
    }
    return status;
}

private Path s3Path(final Path p) {
    if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
        return new Path(s3nURI(p.toUri(), SCHEME_S3N));
    }
    return p;
}

private Path customS3Path(final Path p) {
    if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
        return new Path(s3nURI(p.toUri(), getScheme()));
    }
    return p;
}

private URI s3nURI(final URI originalUri, final String newScheme) {
     try {
         return new URI(
             newScheme,
             originalUri.getUserInfo(),
             originalUri.getHost(),
             originalUri.getPort(),
             originalUri.getPath(),
             originalUri.getQuery(),
             originalUri.getFragment());
     } catch (URISyntaxException e) {
         LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
     }

     return originalUri;
}

The final step is to register custom file system with Hadoop (spark-defaults classification)

spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem

Upvotes: 2

stevel
stevel

Reputation: 13490

I can't speak for Amazon EMR, but on hadoop's s3a connector, you can set the encryption policy on a bucket-by-bucket basis. However, S3A doesn't support client side encryption on account of it breaking fundamental assumptions about file lengths (the amount of data you can read MUST == the length in a directory listing/getFileStatus call).

I expect amazon to do something similar. You may be able to create a custom Hadoop Configuration object with the different settings & use that to retrieve the filesystem instance used to save things. Tricky in Spark though.

Upvotes: 0

Related Questions