Reputation: 4750
I'm running EMR cluster with enabled s3 client-side encryption using custom key provider. But now I need to write data to multiple s3 destinations using different encryption schemas:
Is it possible to configure EMR to use both encryption types by defining some kind of mapping between s3 bucket and encryption type?
Alternatively since I use spark structured streaming to process and write data to s3 I'm wondering if it's possible to disable encryption on EMRFS but then enable CSE for each stream separately?
Upvotes: 2
Views: 3770
Reputation: 113
When you use EMRFS, you can specify per-bucket configs in the format:
fs.s3.bucket.<bucket name>.<some.configuration>
So, for example, to turn off CSE except for a bucket s3://foobar
, you can set:
"Classification": "emrfs-site",
"Properties": {
"fs.s3.cse.enabled": "false",
"fs.s3.bucket.foobar.cse.enabled": "true",
[your other configs as usual]
}
Please note that it must be fs.s3
and not fs.{arbitrary-scheme}
like fs.s3n
.
Upvotes: 2
Reputation: 4750
The idea is to support any file systems scheme and configure it individually. For example:
# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider
#no encryption
fs.s3u.cse.enabled = false
#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id
And then to use it in spark like this:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(TestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(“s3x://aws-s3-bucket/input”)
.as(Encoders.bean(TestRecord.class))
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("path", “s3k://aws-s3-bucket/output”)
.option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
.start();
Ta handle this I’ve implemented a custom Hadoop file system (extends org.apache.hadoop.fs.FileSystem
) that delegates calls to real file system but with modified configurations.
// Create delegate FS
this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));
Configuration that passes to delegating file system should take all original settings and replace any occurrences of fs.s3*.
with fs.s3n.
.
private Configuration substituteS3Config(final Configuration conf) {
if (conf == null) return null;
final String fsSchemaPrefix = "fs." + getScheme() + ".";
final String fsS3SchemaPrefix = "fs.s3.";
final String fsSchemaImpl = "fs." + getScheme() + ".impl";
Configuration substitutedConfig = new Configuration(conf);
for (Map.Entry<String, String> configEntry : conf) {
String propName = configEntry.getKey();
if (!fsSchemaImpl.equals(propName)
&& propName.startsWith(fsSchemaPrefix)) {
final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
substitutedConfig.set(newPropName, configEntry.getValue());
}
}
return substitutedConfig;
}
Besides that make sure that delegating fs receives uris and paths with supporting scheme and returns paths with custom scheme
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
if (status != null) {
status.setPath(customS3Path(status.getPath()));
}
return status;
}
private Path s3Path(final Path p) {
if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), SCHEME_S3N));
}
return p;
}
private Path customS3Path(final Path p) {
if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), getScheme()));
}
return p;
}
private URI s3nURI(final URI originalUri, final String newScheme) {
try {
return new URI(
newScheme,
originalUri.getUserInfo(),
originalUri.getHost(),
originalUri.getPort(),
originalUri.getPath(),
originalUri.getQuery(),
originalUri.getFragment());
} catch (URISyntaxException e) {
LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
}
return originalUri;
}
The final step is to register custom file system with Hadoop (spark-defaults
classification)
spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem
Upvotes: 2
Reputation: 13490
I can't speak for Amazon EMR, but on hadoop's s3a connector, you can set the encryption policy on a bucket-by-bucket basis. However, S3A doesn't support client side encryption on account of it breaking fundamental assumptions about file lengths (the amount of data you can read MUST == the length in a directory listing/getFileStatus call).
I expect amazon to do something similar. You may be able to create a custom Hadoop Configuration
object with the different settings & use that to retrieve the filesystem instance used to save things. Tricky in Spark though.
Upvotes: 0