Skip to content

Commit

Permalink
feat(s3stream): support config checking object integrity on aws client
Browse files Browse the repository at this point in the history
  • Loading branch information
lifepuzzlefun committed Feb 2, 2024
1 parent 4dae71c commit 12c792b
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class S3StreamConfig {
private int streamSetObjectCompactionForceSplitPeriod = 120;
private int streamSetObjectCompactionMaxObjectNum = 500;
private int streamSplitSizeThreshold = 16777216;
private String checksumForUpload = null;
private boolean enableChecksumForGetObject = false;

public String s3Endpoint() {
return s3Endpoint;
Expand Down Expand Up @@ -135,4 +137,12 @@ public int streamSetObjectCompactionMaxObjectNum() {
public int streamSplitSizeThreshold() {
return streamSplitSizeThreshold;
}

public String checksumForUpload() {
return checksumForUpload;
}

public boolean isEnableChecksumForGetObject() {
return enableChecksumForGetObject;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.ChecksumMode;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
Expand Down Expand Up @@ -107,16 +110,19 @@ public class DefaultS3Operator implements S3Operator {
"s3-write-cb-executor", true, LOGGER);
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100);
private final ChecksumAlgorithm checksumForUpload;
private final boolean enableChecksumForGetObject;

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders) {
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false);
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false, null, false);
}

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate,
String checksumForUpload, boolean enableChecksumForGetObject) {
this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate();
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
Expand All @@ -125,6 +131,8 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter;
this.bucket = bucket;
this.checksumForUpload = ChecksumAlgorithm.fromValue(checksumForUpload);
this.enableChecksumForGetObject = enableChecksumForGetObject;
scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS);
checkConfig();
S3Utils.S3Context s3Context = S3Utils.S3Context.builder()
Expand All @@ -151,6 +159,8 @@ public DefaultS3Operator(S3AsyncClient s3Client, String bucket) {
this.writeS3Client = s3Client;
this.readS3Client = s3Client;
this.bucket = bucket;
this.checksumForUpload = null;
this.enableChecksumForGetObject = false;
this.networkInboundBandwidthLimiter = null;
this.networkOutboundBandwidthLimiter = null;
this.inflightWriteLimiter = new Semaphore(50);
Expand Down Expand Up @@ -301,7 +311,12 @@ CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
TimerUtil timerUtil = new TimerUtil();
long size = end - start + 1;
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();

GetObjectRequest request = GetObjectRequest.builder()
.checksumMode(enableChecksumForGetObject ? ChecksumMode.ENABLED : null)
.bucket(bucket).key(path)
.range(range(start, end)).build();

readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size);
Expand Down Expand Up @@ -359,7 +374,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil();
int objectSize = data.readableBytes();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
PutObjectRequest request = PutObjectRequest.builder().checksumAlgorithm(checksumForUpload).bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize);
Expand Down Expand Up @@ -437,7 +452,11 @@ public CompletableFuture<String> createMultipartUpload(String path) {

void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(path)
.checksumAlgorithm(checksumForUpload)
.build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(createMultipartUploadResponse.uploadId());
Expand Down Expand Up @@ -482,13 +501,39 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
TimerUtil timerUtil = new TimerUtil();
int size = part.readableBytes();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
UploadPartRequest request = UploadPartRequest.builder()
.bucket(bucket)
.key(path)
.uploadId(uploadId)
.partNumber(partNumber)
.checksumAlgorithm(checksumForUpload)
.build();
CompletableFuture<UploadPartResponse> uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
CompletedPart.Builder builder = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag());

if (checksumForUpload != null) {
switch (checksumForUpload) {
case CRC32:
builder.checksumCRC32(uploadPartResponse.checksumCRC32());
break;
case CRC32_C:
builder.checksumCRC32C(uploadPartResponse.checksumCRC32C());
break;
case SHA1:
builder.checksumSHA1(uploadPartResponse.checksumSHA1());
break;
case SHA256:
builder.checksumSHA256(uploadPartResponse.checksumSHA256());
break;
case UNKNOWN_TO_SDK_VERSION:
break;
}
}

CompletedPart completedPart = builder.build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
Expand Down Expand Up @@ -554,6 +599,10 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil();
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build();

// note: current not provide the whole object checksum when complete the multipartUpload.
// see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
// see: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
Expand Down Expand Up @@ -657,7 +706,7 @@ public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePa
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders));
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
.apiCallAttemptTimeout(Duration.ofSeconds(30)).retryPolicy(RetryMode.STANDARD));
return builder.build();
}

Expand Down Expand Up @@ -874,6 +923,8 @@ public static class Builder {
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;
private boolean enableCheckSumForGetObject;
private String checkSumForUploadObject;

public Builder endpoint(String endpoint) {
this.endpoint = endpoint;
Expand Down Expand Up @@ -915,9 +966,19 @@ public Builder readWriteIsolate(boolean readWriteIsolate) {
return this;
}

public Builder enableCheckSumForGetObject(boolean enable) {
this.enableCheckSumForGetObject = enable;
return this;
}

public Builder checkSumForUploadObject(String checksum) {
this.checkSumForUploadObject = checksum;
return this;
}

public DefaultS3Operator build() {
return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders,
inboundLimiter, outboundLimiter, readWriteIsolate);
inboundLimiter, outboundLimiter, readWriteIsolate, checkSumForUploadObject, enableCheckSumForGetObject);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store

S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);
networkInboundLimiter, networkOutboundLimiter, true, streamConfig.checksumForUpload(), streamConfig.isEnableChecksumForGetObject());

Check warning on line 91 in store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java#L91

Added line #L91 was not covered by tests

WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
Expand All @@ -99,7 +99,7 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
networkInboundLimiter, networkOutboundLimiter, true);
networkInboundLimiter, networkOutboundLimiter, true, streamConfig.checksumForUpload(), streamConfig.isEnableChecksumForGetObject());

Check warning on line 102 in store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java#L102

Added line #L102 was not covered by tests
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);

this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
Expand Down

0 comments on commit 12c792b

Please sign in to comment.