Vasyl Sarzhynskyi
Vasyl Sarzhynskyi

Reputation: 3955

Does Spark allow to use Amazon Assumed Role and STS temporary credentials for DynamoDB?

I need to fetch data from DynamoDB tables with Spark using Java. It works fine with user’s access key and secret key:

final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", tableName);
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
jobConf.set("dynamodb.awsAccessKeyId",  accessKey);
jobConf.set("dynamodb.awsSecretAccessKey", secretKey);
jobConf.set("dynamodb.endpoint", endpoint);

I need to use AWS assumed role and STS (at least by security reasons) for fetching data from DynamoDB exactly with spark. Is it possible? I found that it possible to use assumed role to access AWS S3 with spark (https://issues.apache.org/jira/browse/HADOOP-12537, https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html), but haven’t found similar idea for DynamoDB.

For receiving STS temporary credentials I use the following code:

AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
AssumeRoleRequest assumeRequest = new AssumeRoleRequest()
        .withRoleArn(roleArn)  // arn:aws:iam::XXXXXXX:role/assume-role-DynamoDB-ReadOnly
        .withDurationSeconds(3600)
        .withRoleSessionName("assumed-role-session");
AssumeRoleResult assumeResult = stsClient.assumeRole(assumeRequest);
Credentials credentials = assumeResult.getCredentials();

Invoking credentials.getAccessKeyId(), credentials.getSecretAccessKey() and credentials.getSessionToken() return generated temporary credentials. With these credentials I successfully could take data from DynamoDB using java aws sdk AmazonDynamoDBClient (non-spark approach).

Is it possible with spark? Does spark allow to use something like the following: jobConf.set("dynamodb.awsSessionToken”, sessionToken) ?

Upvotes: 11

Views: 5798

Answers (1)

vkubushyn
vkubushyn

Reputation: 161

Looking through the code, you may be able to use the dynamodb.customAWSCredentialsProvider with an instance of com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider to get what you want working.

https://github.com/awslabs/emr-dynamodb-connector/blob/master/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBConstants.java#L30

https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/STSAssumeRoleSessionCredentialsProvider.html


EDIT: So this was a little harder than I first thought. I ended up implementing my own wrapper around STSAssumeRoleSessionCredentialsProvider.

package foo.bar;

import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSSessionCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

public class HadoopSTSAssumeRoleSessionCredentialsProvider
        implements AWSSessionCredentialsProvider, Configurable {

    private static final String ROLE_ARN_CONF = "assumed.creds.role.arn";
    private static final String SESSION_NAME_CONF = "assumed.creds.session.name";

    private Configuration configuration;
    private STSAssumeRoleSessionCredentialsProvider delegate;

    public AWSSessionCredentials getCredentials() {
        return delegate.getCredentials();
    }

    public void refresh() {
        delegate.refresh();
    }

    public void setConf(Configuration configuration) {
        this.configuration = configuration;
        String roleArn = configuration.get(ROLE_ARN_CONF);
        String sessionName = configuration.get(SESSION_NAME_CONF);

        if (roleArn == null || roleArn.isEmpty() || sessionName == null || sessionName.isEmpty()) {
            throw new IllegalStateException("Please set " + ROLE_ARN_CONF + " and "
                    + SESSION_NAME_CONF + " before use.");
        }
        delegate = new STSAssumeRoleSessionCredentialsProvider.Builder(
                roleArn, sessionName).build();
    }

    public Configuration getConf() {
        return configuration;
    }
}

And then you can use it like this:

val ddbConf: JobConf = new JobConf(sc.hadoopConfiguration)

ddbConf.set("dynamodb.customAWSCredentialsProvider",
    "foo.bar.HadoopSTSAssumeRoleSessionCredentialsProvider")
ddbConf.set("assumed.creds.role.arn", "roleArn")
ddbConf.set("assumed.creds.session.name", "sessionName")

Upvotes: 3

Related Questions