Reputation: 3955
I need to fetch data from DynamoDB tables with Spark using Java. It works fine with user’s access key and secret key:
final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", tableName);
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
jobConf.set("dynamodb.awsAccessKeyId", accessKey);
jobConf.set("dynamodb.awsSecretAccessKey", secretKey);
jobConf.set("dynamodb.endpoint", endpoint);
I need to use AWS assumed role and STS (at least by security reasons) for fetching data from DynamoDB exactly with spark. Is it possible? I found that it possible to use assumed role to access AWS S3 with spark (https://issues.apache.org/jira/browse/HADOOP-12537, https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html), but haven’t found similar idea for DynamoDB.
For receiving STS temporary credentials I use the following code:
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
AssumeRoleRequest assumeRequest = new AssumeRoleRequest()
.withRoleArn(roleArn) // arn:aws:iam::XXXXXXX:role/assume-role-DynamoDB-ReadOnly
.withDurationSeconds(3600)
.withRoleSessionName("assumed-role-session");
AssumeRoleResult assumeResult = stsClient.assumeRole(assumeRequest);
Credentials credentials = assumeResult.getCredentials();
Invoking credentials.getAccessKeyId(), credentials.getSecretAccessKey() and credentials.getSessionToken()
return generated temporary credentials. With these credentials I successfully could take data from DynamoDB using java aws sdk AmazonDynamoDBClient (non-spark approach).
Is it possible with spark? Does spark allow to use something like the following:
jobConf.set("dynamodb.awsSessionToken”, sessionToken)
?
Upvotes: 11
Views: 5798
Reputation: 161
Looking through the code, you may be able to use the dynamodb.customAWSCredentialsProvider with an instance of com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider to get what you want working.
EDIT: So this was a little harder than I first thought. I ended up implementing my own wrapper around STSAssumeRoleSessionCredentialsProvider.
package foo.bar;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSSessionCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public class HadoopSTSAssumeRoleSessionCredentialsProvider
implements AWSSessionCredentialsProvider, Configurable {
private static final String ROLE_ARN_CONF = "assumed.creds.role.arn";
private static final String SESSION_NAME_CONF = "assumed.creds.session.name";
private Configuration configuration;
private STSAssumeRoleSessionCredentialsProvider delegate;
public AWSSessionCredentials getCredentials() {
return delegate.getCredentials();
}
public void refresh() {
delegate.refresh();
}
public void setConf(Configuration configuration) {
this.configuration = configuration;
String roleArn = configuration.get(ROLE_ARN_CONF);
String sessionName = configuration.get(SESSION_NAME_CONF);
if (roleArn == null || roleArn.isEmpty() || sessionName == null || sessionName.isEmpty()) {
throw new IllegalStateException("Please set " + ROLE_ARN_CONF + " and "
+ SESSION_NAME_CONF + " before use.");
}
delegate = new STSAssumeRoleSessionCredentialsProvider.Builder(
roleArn, sessionName).build();
}
public Configuration getConf() {
return configuration;
}
}
And then you can use it like this:
val ddbConf: JobConf = new JobConf(sc.hadoopConfiguration)
ddbConf.set("dynamodb.customAWSCredentialsProvider",
"foo.bar.HadoopSTSAssumeRoleSessionCredentialsProvider")
ddbConf.set("assumed.creds.role.arn", "roleArn")
ddbConf.set("assumed.creds.session.name", "sessionName")
Upvotes: 3