Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): support config checking object integrity on aws client #915

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class S3StreamConfig {
private int streamSetObjectCompactionForceSplitPeriod = 120;
private int streamSetObjectCompactionMaxObjectNum = 500;
private int streamSplitSizeThreshold = 16777216;
private String checksumForUpload = null;
private boolean enableChecksumForGetObject = false;

public String s3Endpoint() {
return s3Endpoint;
Expand Down Expand Up @@ -135,4 +137,12 @@ public int streamSetObjectCompactionMaxObjectNum() {
public int streamSplitSizeThreshold() {
return streamSplitSizeThreshold;
}

public String checksumForUpload() {
return checksumForUpload;
}

public boolean isEnableChecksumForGetObject() {
return enableChecksumForGetObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.ChecksumMode;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
Expand Down Expand Up @@ -107,16 +110,19 @@ public class DefaultS3Operator implements S3Operator {
"s3-write-cb-executor", true, LOGGER);
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100);
private final ChecksumAlgorithm checksumForUpload;
private final boolean enableChecksumForGetObject;

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders) {
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false);
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false, null, false);
}

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate,
String checksumForUpload, boolean enableChecksumForGetObject) {
this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate();
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
Expand All @@ -125,6 +131,8 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter;
this.bucket = bucket;
this.checksumForUpload = ChecksumAlgorithm.fromValue(checksumForUpload);
this.enableChecksumForGetObject = enableChecksumForGetObject;
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
checkConfig();
S3Utils.S3Context s3Context = S3Utils.S3Context.builder()
Expand All @@ -151,6 +159,8 @@ public DefaultS3Operator(S3AsyncClient s3Client, String bucket) {
this.writeS3Client = s3Client;
this.readS3Client = s3Client;
this.bucket = bucket;
this.checksumForUpload = null;
this.enableChecksumForGetObject = false;
this.networkInboundBandwidthLimiter = null;
this.networkOutboundBandwidthLimiter = null;
this.inflightWriteLimiter = new Semaphore(50);
Expand Down Expand Up @@ -301,7 +311,12 @@ CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
TimerUtil timerUtil = new TimerUtil();
long size = end - start + 1;
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();

GetObjectRequest request = GetObjectRequest.builder()
.checksumMode(enableChecksumForGetObject ? ChecksumMode.ENABLED : null)
.bucket(bucket).key(path)
.range(range(start, end)).build();

readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size);
Expand Down Expand Up @@ -359,7 +374,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil();
int objectSize = data.readableBytes();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
PutObjectRequest request = PutObjectRequest.builder().checksumAlgorithm(checksumForUpload).bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize);
Expand Down Expand Up @@ -437,7 +452,11 @@ public CompletableFuture<String> createMultipartUpload(String path) {

void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.checksumAlgorithm(checksumForUpload)
.build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(createMultipartUploadResponse.uploadId());
Expand Down Expand Up @@ -482,13 +501,39 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
TimerUtil timerUtil = new TimerUtil();
int size = part.readableBytes();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
UploadPartRequest request = UploadPartRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.partNumber(partNumber)
.checksumAlgorithm(checksumForUpload)
.build();
CompletableFuture<UploadPartResponse> uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
CompletedPart.Builder builder = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag());

if (checksumForUpload != null) {
switch (checksumForUpload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming it checksumAlgorithmForUpload directly? It's more understandable than checksumForUpload.

case CRC32:
builder.checksumCRC32(uploadPartResponse.checksumCRC32());
break;
case CRC32_C:
builder.checksumCRC32C(uploadPartResponse.checksumCRC32C());
break;
case SHA1:
builder.checksumSHA1(uploadPartResponse.checksumSHA1());
break;
case SHA256:
builder.checksumSHA256(uploadPartResponse.checksumSHA256());
break;
case UNKNOWN_TO_SDK_VERSION:
break;
}
}

CompletedPart completedPart = builder.build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
Expand Down Expand Up @@ -554,6 +599,10 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil();
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build();

// note: current not provide the whole object checksum when complete the multipartUpload.
// see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
// see: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
Expand Down Expand Up @@ -657,7 +706,7 @@ public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePa
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders));
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
.apiCallAttemptTimeout(Duration.ofSeconds(30)).retryPolicy(RetryMode.STANDARD));
return builder.build();
}

Expand Down Expand Up @@ -874,6 +923,8 @@ public static class Builder {
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;
private boolean enableCheckSumForGetObject;
private String checkSumForUploadObject;

public Builder endpoint(String endpoint) {
this.endpoint = endpoint;
Expand Down Expand Up @@ -915,9 +966,19 @@ public Builder readWriteIsolate(boolean readWriteIsolate) {
return this;
}

public Builder enableCheckSumForGetObject(boolean enable) {
this.enableCheckSumForGetObject = enable;
return this;
}

public Builder checkSumForUploadObject(String checksum) {
this.checkSumForUploadObject = checksum;
return this;
}

public DefaultS3Operator build() {
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders,
inboundLimiter, outboundLimiter, readWriteIsolate);
inboundLimiter, outboundLimiter, readWriteIsolate, checkSumForUploadObject, enableCheckSumForGetObject);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);
networkInboundLimiter, networkOutboundLimiter, true, streamConfig.checksumForUpload(), streamConfig.isEnableChecksumForGetObject());

Check warning on line 91 in store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java#L91

Added line #L91 was not covered by tests

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
Expand All @@ -99,7 +99,7 @@
// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);
networkInboundLimiter, networkOutboundLimiter, true, streamConfig.checksumForUpload(), streamConfig.isEnableChecksumForGetObject());

Check warning on line 102 in store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java#L102

Added line #L102 was not covered by tests
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
Expand Down
Loading