Skip to content

Commit

Permalink
fix(s3stream): fix operation latency unit (AutoMQ#892)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Jan 10, 2024
1 parent 069b38d commit 87098f5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
10 changes: 5 additions & 5 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -196,7 +196,7 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
Expand Down Expand Up @@ -260,7 +260,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return cf;
}, LOGGER, "trim");
Expand Down Expand Up @@ -310,10 +310,10 @@ public CompletableFuture<Void> close() {
closeCf.whenComplete((nil, ex) -> {
if (ex != null) {
LOGGER.error("{} close fail", logIdent, ex);
StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
} else {
LOGGER.info("{} closed", logIdent);
StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
TimerUtil timerUtil = new TimerUtil();
networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND)
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
} else {
Expand Down Expand Up @@ -316,12 +316,12 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
path, start, end, size, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}
S3OperationStats.getInstance().downloadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().getObjectStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().getObjectStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(buf);
});
})
.exceptionally(ex -> {
S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
Expand All @@ -344,7 +344,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
} else {
Expand All @@ -364,12 +364,12 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
writeS3Client.putObject(request, body).thenAccept(putObjectResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, objectSize);
S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().putObjectStats(objectSize, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.debug("put object {} with size {}, cost {}ms", path, objectSize, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
data.release();
}).exceptionally(ex -> {
S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().putObjectStats(objectSize, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("PutObject for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand All @@ -392,10 +392,10 @@ public CompletableFuture<Void> delete(String path) {
TimerUtil timerUtil = new TimerUtil();
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete object finished, path: {}, cost: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}).exceptionally(ex -> {
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete object failed, path: {}, cost: {}, ex: {}", path, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return null;
});
Expand All @@ -415,11 +415,11 @@ public CompletableFuture<List<String>> delete(List<String> objectKeys) {
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectsStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().deleteObjectsStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
Expand All @@ -440,10 +440,10 @@ void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil();
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().createMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(createMultipartUploadResponse.uploadId());
}).exceptionally(ex -> {
S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().createMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("CreateMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -488,11 +488,11 @@ private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf p
CompletableFuture<UploadPartResponse> uploadPartCf = writeS3Client.uploadPart(request, body);
uploadPartCf.thenAccept(uploadPartResponse -> {
S3OperationStats.getInstance().uploadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("UploadPart for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -523,12 +523,12 @@ private void uploadPartCopy0(String sourcePath, String path, long start, long en
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
writeS3Client.uploadPartCopy(request).thenAccept(uploadPartCopyResponse -> {
S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartCopyStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
CompletedPart completedPart = CompletedPart.builder().partNumber(partNumber)
.eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
cf.complete(completedPart);
}).exceptionally(ex -> {
S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().uploadPartCopyStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.warn("UploadPartCopy for object {}-{} fail", path, partNumber, ex);
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -558,10 +558,10 @@ public void completeMultipartUpload0(String path, String uploadId, List<Complete
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

writeS3Client.completeMultipartUpload(request).thenAccept(completeMultipartUploadResponse -> {
S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().completeMultiPartUploadStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(null);
}).exceptionally(ex -> {
S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
S3OperationStats.getInstance().completeMultiPartUploadStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("CompleteMultipartUpload for object {} fail", path, ex);
cf.completeExceptionally(ex);
Expand Down

0 comments on commit 87098f5

Please sign in to comment.