diff --git a/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java b/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java index 38f555d6e..5ce9eb34a 100644 --- a/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java +++ b/common/src/main/java/com/automq/rocketmq/common/config/S3StreamConfig.java @@ -43,6 +43,8 @@ public class S3StreamConfig { private int streamSetObjectCompactionForceSplitPeriod = 120; private int streamSetObjectCompactionMaxObjectNum = 500; private int streamSplitSizeThreshold = 16777216; + private String checksumForUpload = null; + private boolean enableChecksumForGetObject = false; public String s3Endpoint() { return s3Endpoint; @@ -135,4 +137,12 @@ public int streamSetObjectCompactionMaxObjectNum() { public int streamSplitSizeThreshold() { return streamSplitSizeThreshold; } + + public String checksumForUpload() { + return checksumForUpload; + } + + public boolean isEnableChecksumForGetObject() { + return enableChecksumForGetObject; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index a742a4883..326446249 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -61,10 +61,13 @@ import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; +import software.amazon.awssdk.services.s3.model.ChecksumMode; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; @@ -107,16 +110,19 @@ public class DefaultS3Operator implements S3Operator { "s3-write-cb-executor", true, LOGGER); private final HashedWheelTimer timeoutDetect = new HashedWheelTimer( ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100); + private final ChecksumAlgorithm checksumForUpload; + private final boolean enableChecksumForGetObject; public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, List credentialsProviders) { - this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false); + this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false, null, false); } public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, - List credentialsProviders, - AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, - AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) { + List credentialsProviders, + AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter, + AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate, + String checksumForUpload, boolean enableChecksumForGetObject) { this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate(); this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter; this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter; @@ -125,6 +131,8 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean this.inflightWriteLimiter = new Semaphore(50); this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter; this.bucket = bucket; + this.checksumForUpload = ChecksumAlgorithm.fromValue(checksumForUpload); + this.enableChecksumForGetObject = enableChecksumForGetObject; scheduler.scheduleWithFixedDelay(this::tryMergeRead, 1, 1, TimeUnit.MILLISECONDS); checkConfig(); S3Utils.S3Context s3Context = S3Utils.S3Context.builder() @@ -151,6 +159,8 @@ public DefaultS3Operator(S3AsyncClient s3Client, String bucket) { this.writeS3Client = s3Client; this.readS3Client = s3Client; this.bucket = bucket; + this.checksumForUpload = null; + this.enableChecksumForGetObject = false; this.networkInboundBandwidthLimiter = null; this.networkOutboundBandwidthLimiter = null; this.inflightWriteLimiter = new Semaphore(50); @@ -301,7 +311,12 @@ CompletableFuture mergedRangeRead(String path, long start, long end) { void mergedRangeRead0(String path, long start, long end, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); long size = end - start + 1; - GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build(); + + GetObjectRequest request = GetObjectRequest.builder() + .checksumMode(enableChecksumForGetObject ? ChecksumMode.ENABLED : null) + .bucket(bucket).key(path) + .range(range(start, end)).build(); + readS3Client.getObject(request, AsyncResponseTransformer.toPublisher()) .thenAccept(responsePublisher -> { S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size); @@ -359,7 +374,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy private void write0(String path, ByteBuf data, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); int objectSize = data.readableBytes(); - PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build(); + PutObjectRequest request = PutObjectRequest.builder().checksumAlgorithm(checksumForUpload).bucket(bucket).key(path).build(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers()); writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize); @@ -437,7 +452,11 @@ public CompletableFuture createMultipartUpload(String path) { void createMultipartUpload0(String path, CompletableFuture cf) { TimerUtil timerUtil = new TimerUtil(); - CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build(); + CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder() + .bucket(bucket) + .key(path) + .checksumAlgorithm(checksumForUpload) + .build(); writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> { S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); cf.complete(createMultipartUploadResponse.uploadId()); @@ -482,13 +501,39 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p TimerUtil timerUtil = new TimerUtil(); int size = part.readableBytes(); AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers()); - UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId) - .partNumber(partNumber).build(); + UploadPartRequest request = UploadPartRequest.builder() + .bucket(bucket) + .key(path) + .uploadId(uploadId) + .partNumber(partNumber) + .checksumAlgorithm(checksumForUpload) + .build(); CompletableFuture uploadPartCf = writeS3Client.uploadPart(request, body); uploadPartCf.thenAccept(uploadPartResponse -> { S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size); S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build(); + CompletedPart.Builder builder = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()); + + if (checksumForUpload != null) { + switch (checksumForUpload) { + case CRC32: + builder.checksumCRC32(uploadPartResponse.checksumCRC32()); + break; + case CRC32_C: + builder.checksumCRC32C(uploadPartResponse.checksumCRC32C()); + break; + case SHA1: + builder.checksumSHA1(uploadPartResponse.checksumSHA1()); + break; + case SHA256: + builder.checksumSHA256(uploadPartResponse.checksumSHA256()); + break; + case UNKNOWN_TO_SDK_VERSION: + break; + } + } + + CompletedPart completedPart = builder.build(); cf.complete(completedPart); }).exceptionally(ex -> { S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); @@ -554,6 +599,10 @@ public void completeMultipartUpload0(String path, String uploadId, List cf) { TimerUtil timerUtil = new TimerUtil(); CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build(); + + // note: current not provide the whole object checksum when complete the multipartUpload. + // see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + // see: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build(); writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> { @@ -657,7 +706,7 @@ public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePa builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle)); builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders)); builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1)) - .apiCallAttemptTimeout(Duration.ofSeconds(30))); + .apiCallAttemptTimeout(Duration.ofSeconds(30)).retryPolicy(RetryMode.STANDARD)); return builder.build(); } @@ -874,6 +923,8 @@ public static class Builder { private AsyncNetworkBandwidthLimiter inboundLimiter; private AsyncNetworkBandwidthLimiter outboundLimiter; private boolean readWriteIsolate; + private boolean enableCheckSumForGetObject; + private String checkSumForUploadObject; public Builder endpoint(String endpoint) { this.endpoint = endpoint; @@ -915,9 +966,19 @@ public Builder readWriteIsolate(boolean readWriteIsolate) { return this; } + public Builder enableCheckSumForGetObject(boolean enable) { + this.enableCheckSumForGetObject = enable; + return this; + } + + public Builder checkSumForUploadObject(String checksum) { + this.checkSumForUploadObject = checksum; + return this; + } + public DefaultS3Operator build() { return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders, - inboundLimiter, outboundLimiter, readWriteIsolate); + inboundLimiter, outboundLimiter, readWriteIsolate, checkSumForUploadObject, enableCheckSumForGetObject); } } } diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java index 6e6515582..cb1cfc982 100644 --- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java +++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java @@ -88,7 +88,7 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())), - networkInboundLimiter, networkOutboundLimiter, true); + networkInboundLimiter, networkOutboundLimiter, true, streamConfig.checksumForUpload(), streamConfig.isEnableChecksumForGetObject()); WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build(); S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator); @@ -99,7 +99,7 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store // Build the compaction manager S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(), streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())), - networkInboundLimiter, networkOutboundLimiter, true); + networkInboundLimiter, networkOutboundLimiter, true, streamConfig.checksumForUpload(), streamConfig.isEnableChecksumForGetObject()); this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator); this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);