From efb3a9bd5bfab3f53970c6cc605fb83a9b790f9e Mon Sep 17 00:00:00 2001 From: Richard Smith Date: Wed, 13 Mar 2024 10:17:06 -0700 Subject: [PATCH] Update to use batch payload delete, and to properly chain deletes for async. --- pom.xml | 4 ++-- .../AmazonSQSExtendedAsyncClient.java | 20 +++++++++++++------ .../AmazonSQSExtendedClient.java | 8 +++++++- .../AmazonSQSExtendedAsyncClientTest.java | 6 +++++- .../AmazonSQSExtendedClientTest.java | 3 ++- 5 files changed, 30 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index 9245270..7111582 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazonaws amazon-sqs-java-extended-client-lib - 2.1.0 + 2.1.1 jar Amazon SQS Extended Client Library for Java An extension to the Amazon SQS client that enables sending and receiving messages up to 2GB via Amazon S3. @@ -57,7 +57,7 @@ software.amazon.payloadoffloading payloadoffloading-common - 2.2.0 + 2.2.1 diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java index 02404a2..49690af 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java @@ -298,10 +298,8 @@ public CompletableFuture deleteMessage(DeleteMessageReque // Delete from SQS first, then S3. final String messageToDeletePointer = messagePointer; - return super.deleteMessage(deleteMessageRequestBuilder.build()) - .thenCompose(deleteMessageResponse -> - payloadStore.deleteOriginalPayload(messageToDeletePointer) - .thenApply(v -> deleteMessageResponse)); + return payloadStore.deleteOriginalPayload(messageToDeletePointer) + .thenCompose(ignore -> super.deleteMessage(deleteMessageRequestBuilder.build())); } /** @@ -395,6 +393,7 @@ public CompletableFuture deleteMessageBatch( } List entries = new ArrayList<>(deleteMessageBatchRequest.entries().size()); + List s3ToCleanup = new ArrayList<>(deleteMessageBatchRequest.entries().size()); for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.entries()) { DeleteMessageBatchRequestEntry.Builder entryBuilder = entry.toBuilder(); String receiptHandle = entry.receiptHandle(); @@ -406,7 +405,7 @@ public CompletableFuture deleteMessageBatch( // Delete s3 payload if needed if (clientConfiguration.doesCleanupS3Payload()) { String messagePointer = getMessagePointerFromModifiedReceiptHandle(receiptHandle); - payloadStore.deleteOriginalPayload(messagePointer); + s3ToCleanup.add(messagePointer); } } @@ -415,7 +414,16 @@ public CompletableFuture deleteMessageBatch( } deleteMessageBatchRequestBuilder.entries(entries); - return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build()); + + // Check if message is in S3 or only in SQS. + if (s3ToCleanup.isEmpty()) { + // Delete only from SQS + return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build()); + } + + // Delete from S3 first, then SQS. + return payloadStore.deleteOriginalPayloads(s3ToCleanup) + .thenCompose(ignore -> super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build())); } /** diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java index 68317d2..f9373b6 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -698,6 +698,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d } List entries = new ArrayList<>(deleteMessageBatchRequest.entries().size()); + List s3ToCleanup = new ArrayList<>(deleteMessageBatchRequest.entries().size()); for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.entries()) { DeleteMessageBatchRequestEntry.Builder entryBuilder = entry.toBuilder(); String receiptHandle = entry.receiptHandle(); @@ -709,7 +710,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d // Delete s3 payload if needed if (clientConfiguration.doesCleanupS3Payload()) { String messagePointer = getMessagePointerFromModifiedReceiptHandle(receiptHandle); - payloadStore.deleteOriginalPayload(messagePointer); + s3ToCleanup.add(messagePointer); } } @@ -718,6 +719,11 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d } deleteMessageBatchRequestBuilder.entries(entries); + + if (!s3ToCleanup.isEmpty()) { + payloadStore.deleteOriginalPayloads(s3ToCleanup); + } + return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build()); } diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java index eb7de08..29d3794 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java @@ -38,6 +38,8 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -91,6 +93,8 @@ public void setupClients() { CompletableFuture.completedFuture(null)); when(mockS3.deleteObject(isA(DeleteObjectRequest.class))).thenReturn( CompletableFuture.completedFuture(DeleteObjectResponse.builder().build())); + when(mockS3.deleteObjects(isA(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build())); when(mockSqsBackend.sendMessage(isA(SendMessageRequest.class))).thenReturn( CompletableFuture.completedFuture(SendMessageResponse.builder().build())); when(mockSqsBackend.sendMessageBatch(isA(SendMessageBatchRequest.class))).thenReturn( @@ -590,7 +594,7 @@ public void testDefaultExtendedClientDeletesObjectsFromS3UponDeleteBatch() { IntStream.range(0, originalReceiptHandles.size()).forEach(i -> assertEquals( originalReceiptHandles.get(i), request.entries().get(i).receiptHandle())); - verify(mockS3, times(batchSize)).deleteObject(any(DeleteObjectRequest.class)); + verify(mockS3, times(1)).deleteObjects(any(DeleteObjectsRequest.class)); } @Test diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java index fd58b0b..f29c5e7 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -29,6 +29,7 @@ import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -621,7 +622,7 @@ public void testDefaultExtendedClientDeletesObjectsFromS3UponDeleteBatch() { IntStream.range(0, originalReceiptHandles.size()).forEach(i -> assertEquals( originalReceiptHandles.get(i), request.entries().get(i).receiptHandle())); - verify(mockS3, times(batchSize)).deleteObject(any(DeleteObjectRequest.class)); + verify(mockS3, times(1)).deleteObjects(any(DeleteObjectsRequest.class)); } @Test