Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] IAM Assume Role chaining Credentials Provider #744

Draft
wants to merge 1 commit into
base: 10.5.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion kafka-connect-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
</description>

<properties>
<aws.sdk.version>2.20.133</aws.sdk.version>
<aws.version>1.12.524</aws.version>
<s3mock.version>0.2.5</s3mock.version>
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
Expand Down Expand Up @@ -83,6 +84,11 @@
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<!--Pin snappy-java version to fix CVE-2023-34455 -->
<dependency>
<groupId>org.xerial.snappy</groupId>
Expand Down Expand Up @@ -180,7 +186,6 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.connect.s3.auth.iamAssume.AwsIAMAssumeRoleChaining;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -62,6 +61,7 @@
import io.confluent.connect.s3.format.parquet.ParquetFormat;
import io.confluent.connect.s3.storage.CompressionType;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.connect.storage.StorageSinkConnectorConfig;
import io.confluent.connect.storage.common.ComposableConfig;
import io.confluent.connect.storage.common.GenericRecommender;
Expand Down Expand Up @@ -118,12 +118,21 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1
);

public static final String AUTH_METHOD = "authentication.method";
public static final String AWS_AUTH_DEFAULT = "Access key and secret";
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
public static final String AWS_ACCESS_KEY_ID_DEFAULT = "";

public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";
public static final Password AWS_SECRET_ACCESS_KEY_DEFAULT = new Password(null);

public static final String CUSTOMER_ROLE_ARN_CONFIG = "aws.iam.assume.role";
public static final String CUSTOMER_ROLE_ARN_DEFAULT = "";
public static final String CUSTOMER_ROLE_EXTERNAL_ID_CONFIG = "aws.iam.external.id";
public static final Password CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT = new Password(null);
public static final String MIDDLEWARE_ROLE_ARN_CONFIG = "middleware.aws.iam.assume.role";
public static final String MIDDLEWARE_ROLE_ARN_DEFAULT = "";

public static final String REGION_CONFIG = "s3.region";
public static final String REGION_DEFAULT = Regions.DEFAULT_REGION.getName();

Expand Down Expand Up @@ -375,6 +384,54 @@ public static ConfigDef newConfigDef() {
"AWS Credentials Provider Class"
);

configDef.define(
AUTH_METHOD,
Type.STRING,
AWS_AUTH_DEFAULT,
Importance.HIGH,
"Authentication method used for S3 Sink connector",
group,
++orderInGroup,
Width.LONG,
"Authentication method"
);

configDef.define(
CUSTOMER_ROLE_ARN_CONFIG,
Type.STRING,
CUSTOMER_ROLE_ARN_DEFAULT,
Importance.HIGH,
"Role ARN",
group,
++orderInGroup,
Width.LONG,
"AWS Role ARN"
);

configDef.define(
CUSTOMER_ROLE_EXTERNAL_ID_CONFIG,
Type.PASSWORD,
CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT,
Importance.HIGH,
"External ID",
group,
++orderInGroup,
Width.LONG,
"AWS External ID"
);

configDef.define(
MIDDLEWARE_ROLE_ARN_CONFIG,
Type.STRING,
MIDDLEWARE_ROLE_ARN_DEFAULT,
Importance.HIGH,
"Role ARN",
group,
++orderInGroup,
Width.LONG,
"AWS Role ARN"
);

configDef.define(
AWS_ACCESS_KEY_ID_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -878,26 +935,52 @@ public Password awsSecretKeyId() {
return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG);
}

public String awsCustomerRoleARN() {
return getString(CUSTOMER_ROLE_ARN_CONFIG);
}

public String awsMiddlewareRoleARN() {
return getString(MIDDLEWARE_ROLE_ARN_CONFIG);
}

public String awsExternalId() {
return getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG);
}

public int getPartSize() {
return getInt(PART_SIZE_CONFIG);
}

public String getAuthenticationMethod() {
return getString(AUTH_METHOD);
}

@SuppressWarnings("unchecked")
public AWSCredentialsProvider getCredentialsProvider() {
try {
AWSCredentialsProvider provider = ((Class<? extends AWSCredentialsProvider>)
getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance();
getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance();

if (provider instanceof Configurable) {
Map<String, Object> configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX);
configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring(
CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()
CREDENTIALS_PROVIDER_CONFIG_PREFIX.length()
));

configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId());
configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value());
String authMethod = getAuthenticationMethod();
if (authMethod == "IAM Assume Role") {
configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN());
configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId());
configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN());

((Configurable) provider).configure(configs);
provider = new AwsIAMAssumeRoleChaining();
((AwsIAMAssumeRoleChaining) provider).configure(configs);
}
else {
configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId());
configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value());
((Configurable) provider).configure(configs);
}
} else {
final String accessKeyId = awsAccessKeyId();
final String secretKey = awsSecretKeyId().value();
Expand All @@ -910,8 +993,8 @@ public AWSCredentialsProvider getCredentialsProvider() {
return provider;
} catch (IllegalAccessException | InstantiationException e) {
throw new ConnectException(
"Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG,
e
"Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG,
e
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.confluent.connect.s3.auth.iamAssume;

import com.amazonaws.auth.*;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.Tag;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG;
import java.util.Map;

public class AwsIAMAssumeRoleChaining implements AWSCredentialsProvider {

private static final ConfigDef STS_CONFIG_DEF = new ConfigDef()
.define(
CUSTOMER_ROLE_EXTERNAL_ID_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.MEDIUM,
"The role external ID used when retrieving session credentials under an assumed role."
).define(
CUSTOMER_ROLE_ARN_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Role ARN to use when starting a session."
).define(
MIDDLEWARE_ROLE_ARN_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Role ARN to use when starting a session."
);

private String customerRoleArn;
private String customerRoleExternalId;
private String middlewareRoleArn;
private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider;

// Method to initiate role chaining
public void configure(Map<String, ?> configs) {
// Assume the initial role
AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs);
customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG);
customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG);
middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG);

STSAssumeRoleSessionCredentialsProvider initialProvider = buildProvider(middlewareRoleArn, "middlewareSession", "", null);

// Use the credentials from the initial role to assume the subsequent role
stsCredentialProvider = buildProvider(customerRoleArn, "customerSession", customerRoleExternalId, initialProvider);
}

// Updated buildProvider to optionally accept an existing AwsCredentialsProvider
private STSAssumeRoleSessionCredentialsProvider buildProvider(final String roleArn, final String roleSessionName, final String roleExternalId, STSAssumeRoleSessionCredentialsProvider existingProvider) {
STSAssumeRoleSessionCredentialsProvider credentialsProvider;
// If an existing credentials provider is provided, use it for creating the STS client
if (existingProvider != null) {
AWSCredentials basicCredentials = existingProvider.getCredentials();
credentialsProvider = new STSAssumeRoleSessionCredentialsProvider
.Builder(roleArn, roleSessionName)
.withStsClient(AWSSecurityTokenServiceClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build()
)
.withExternalId(roleExternalId)
.build();
} else {
credentialsProvider = new STSAssumeRoleSessionCredentialsProvider
.Builder(roleArn, roleSessionName)
.withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
.withExternalId(roleExternalId)
.build();
}
return credentialsProvider;
}

// Helper method to build the AssumeRoleRequest
private AssumeRoleRequest buildRequest(String roleExternalId, String roleArn) {
return AssumeRoleRequest.builder()
.roleArn(roleArn)
.externalId(roleExternalId)
.build();
}

@Override
public AWSCredentials getCredentials() {
return stsCredentialProvider.getCredentials();
}

@Override
public void refresh() {
stsCredentialProvider.refresh();
}
}