Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
Expand Down Expand Up @@ -112,6 +111,7 @@
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.utils.SseKmsUtil.configureEncryptionSettings;

class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer {

Expand All @@ -131,7 +131,13 @@ public boolean blobExists(String blobName) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivileged(
() -> clientReference.get()
.headObject(HeadObjectRequest.builder().bucket(blobStore.bucket()).key(buildKey(blobName)).build())
.headObject(
HeadObjectRequest.builder()
.bucket(blobStore.bucket())
.key(buildKey(blobName))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build()
)
);
return true;
} catch (NoSuchKeyException e) {
Expand Down Expand Up @@ -216,7 +222,12 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
writeContext.getMetadata(),
blobStore.serverSideEncryptionType(),
blobStore.serverSideEncryptionKmsKey(),
blobStore.serverSideEncryptionBucketKey(),
blobStore.serverSideEncryptionEncryptionContext(),
blobStore.expectedBucketOwner()
);
try {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
Expand Down Expand Up @@ -594,6 +605,7 @@ private ListObjectsV2Request listObjectsRequest(String keyPath) {
.prefix(keyPath)
.delimiter("/")
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();
}

Expand Down Expand Up @@ -630,14 +642,13 @@ void executeSingleUpload(
.contentLength(blobSize)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher))
.expectedBucketOwner(blobStore.expectedBucketOwner());

if (CollectionUtils.isNotEmpty(metadata)) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
configureEncryptionSettings(putObjectRequestBuilder, blobStore);

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand Down Expand Up @@ -687,15 +698,14 @@ void executeMultipartUpload(
.key(blobName)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.expectedBucketOwner(blobStore.expectedBucketOwner());

if (CollectionUtils.isNotEmpty(metadata)) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore);

final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
Expand Down Expand Up @@ -724,6 +734,7 @@ void executeMultipartUpload(
.partNumber(i)
.contentLength((i < nbParts) ? partSize : lastPartSize)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

bytesCount += uploadPartRequest.contentLength();
Expand All @@ -746,6 +757,7 @@ void executeMultipartUpload(
.uploadId(uploadId.get())
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

SocketAccess.doPrivilegedVoid(() -> clientReference.get().completeMultipartUpload(completeMultipartUploadRequest));
Expand All @@ -759,6 +771,7 @@ void executeMultipartUpload(
.bucket(bucketName)
.key(blobName)
.uploadId(uploadId.get())
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.get().abortMultipartUpload(abortRequest));
Expand Down Expand Up @@ -825,12 +838,14 @@ CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
@Nullable Integer partNumber
) {
final boolean isMultipartObject = partNumber != null;
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder().bucket(bucketName).key(blobKey);
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobKey)
.expectedBucketOwner(blobStore.expectedBucketOwner());

if (isMultipartObject) {
getObjectRequestBuilder.partNumber(partNumber);
}

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(response -> transformResponseToInputStreamContainer(response, isMultipartObject))
Expand Down Expand Up @@ -871,6 +886,7 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
Expand All @@ -57,13 +59,17 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.EXPECTED_BUCKET_OWNER_SETTING;
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_BUCKET_KEY_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_ENCRYPTION_CONTEXT_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_KMS_KEY_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_TYPE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {
public class S3BlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(S3BlobStore.class);

Expand All @@ -81,7 +87,11 @@ class S3BlobStore implements BlobStore {

private volatile boolean permitBackedTransferEnabled;

private volatile boolean serverSideEncryption;
private volatile String serverSideEncryptionType;
private volatile String serverSideEncryptionKmsKey;
private volatile boolean serverSideEncryptionBucketKey;
private volatile String serverSideEncryptionEncryptionContext;
private volatile String expectedBucketOwner;

private volatile ObjectCannedACL cannedACL;

Expand All @@ -107,7 +117,6 @@ class S3BlobStore implements BlobStore {
S3AsyncService s3AsyncService,
boolean multipartUploadEnabled,
String bucket,
boolean serverSideEncryption,
ByteSizeValue bufferSize,
String cannedACL,
String storageClass,
Expand All @@ -119,13 +128,17 @@ class S3BlobStore implements BlobStore {
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
GenericStatsMetricPublisher genericStatsMetricPublisher
GenericStatsMetricPublisher genericStatsMetricPublisher,
String serverSideEncryptionType,
String serverSideEncryptionKmsKey,
boolean serverSideEncryptionBucketKey,
String serverSideEncryptionEncryptionContext,
String expectedBucketOwner
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
this.multipartUploadEnabled = multipartUploadEnabled;
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
Expand All @@ -142,20 +155,29 @@ class S3BlobStore implements BlobStore {
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
this.serverSideEncryptionType = serverSideEncryptionType;
this.serverSideEncryptionKmsKey = serverSideEncryptionKmsKey;
this.serverSideEncryptionBucketKey = serverSideEncryptionBucketKey;
this.serverSideEncryptionEncryptionContext = serverSideEncryptionEncryptionContext;
this.expectedBucketOwner = expectedBucketOwner;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata) {
this.repositoryMetadata = repositoryMetadata;
this.bucket = BUCKET_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(repositoryMetadata.settings());
this.bufferSize = BUFFER_SIZE_SETTING.get(repositoryMetadata.settings());
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
this.serverSideEncryptionType = SERVER_SIDE_ENCRYPTION_TYPE_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryptionKmsKey = SERVER_SIDE_ENCRYPTION_KMS_KEY_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryptionBucketKey = SERVER_SIDE_ENCRYPTION_BUCKET_KEY_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryptionEncryptionContext = SERVER_SIDE_ENCRYPTION_ENCRYPTION_CONTEXT_SETTING.get(repositoryMetadata.settings());
this.expectedBucketOwner = EXPECTED_BUCKET_OWNER_SETTING.get(repositoryMetadata.settings());
}

@Override
Expand Down Expand Up @@ -191,8 +213,33 @@ public String bucket() {
return bucket;
}

public boolean serverSideEncryption() {
return serverSideEncryption;
public String serverSideEncryptionType() {
return serverSideEncryptionType;
}

public String serverSideEncryptionKmsKey() {
return serverSideEncryptionKmsKey;
}

public boolean serverSideEncryptionBucketKey() {
return serverSideEncryptionBucketKey;
}

/**
* Returns the SSE encryption context base64 UTF8 encoded, as required by S3 SDK. If the encryption context is empty return
* null as the S3 client ignores null header values
*/
public String serverSideEncryptionEncryptionContext() {
return serverSideEncryptionEncryptionContext.isEmpty()
? null
: Base64.getEncoder().encodeToString(serverSideEncryptionEncryptionContext.getBytes(StandardCharsets.UTF_8));
}

/**
* Returns the expected bucket owner if set, else null as the S3 client ignores null header values
*/
public String expectedBucketOwner() {
return expectedBucketOwner.isEmpty() ? null : expectedBucketOwner;
}

public long bufferSizeInBytes() {
Expand Down
Loading
Loading