-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent a batch of messages going over the threshold #37
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,3 @@ | ||
target/ | ||
.idea/ | ||
*.iml | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,7 @@ | |
import java.io.Writer; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.*; | ||
import java.util.Map.Entry; | ||
|
||
import com.amazonaws.AmazonClientException; | ||
|
@@ -775,15 +773,41 @@ public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessa | |
|
||
List<SendMessageBatchRequestEntry> batchEntries = sendMessageBatchRequest.getEntries(); | ||
|
||
List<Integer> smallMessages = new LinkedList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now this is a two pass process. Move the obviously big ones to s3, catching the indices of those we didn't move. |
||
long totalSize = moveIndividuallyLargeMessagesToS3(batchEntries, smallMessages); | ||
|
||
if (isLarge(totalSize) && !smallMessages.isEmpty()) { | ||
moveRemainingMessagesToS3(batchEntries, smallMessages); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If necessary, then we move the remainder |
||
|
||
return super.sendMessageBatch(sendMessageBatchRequest); | ||
} | ||
|
||
private void moveRemainingMessagesToS3(List<SendMessageBatchRequestEntry> batchEntries, List<Integer> smallMessages) { | ||
for (Integer smallMessageIndex : smallMessages) { | ||
batchEntries.set(smallMessageIndex, storeMessageInS3(batchEntries.get(smallMessageIndex))); | ||
} | ||
} | ||
|
||
private long moveIndividuallyLargeMessagesToS3(List<SendMessageBatchRequestEntry> batchEntries, | ||
List<Integer> smallMessages) { | ||
int index = 0; | ||
long totalSize = 0; | ||
for (SendMessageBatchRequestEntry entry : batchEntries) { | ||
if (clientConfiguration.isAlwaysThroughS3() || isLarge(entry)) { | ||
batchEntries.set(index, storeMessageInS3(entry)); | ||
long entrySize = sizeOf(entry); | ||
|
||
if (clientConfiguration.isAlwaysThroughS3() || isLarge(entrySize)) { | ||
SendMessageBatchRequestEntry storedVersion = storeMessageInS3(entry); | ||
batchEntries.set(index, storedVersion); | ||
totalSize += sizeOf(storedVersion); | ||
} else { | ||
totalSize += entrySize; | ||
smallMessages.add(index); | ||
} | ||
|
||
++index; | ||
} | ||
|
||
return super.sendMessageBatch(sendMessageBatchRequest); | ||
return totalSize; | ||
} | ||
|
||
/** | ||
|
@@ -1215,14 +1239,17 @@ private boolean isLarge(SendMessageRequest sendMessageRequest) { | |
int msgAttributesSize = getMsgAttributesSize(sendMessageRequest.getMessageAttributes()); | ||
long msgBodySize = getStringSizeInBytes(sendMessageRequest.getMessageBody()); | ||
long totalMsgSize = msgAttributesSize + msgBodySize; | ||
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold()); | ||
return isLarge(totalMsgSize); | ||
} | ||
|
||
private boolean isLarge(SendMessageBatchRequestEntry batchEntry) { | ||
private boolean isLarge(long size) { | ||
return (size > clientConfiguration.getMessageSizeThreshold()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reused |
||
} | ||
|
||
private long sizeOf(SendMessageBatchRequestEntry batchEntry) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes()); | ||
long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody()); | ||
long totalMsgSize = msgAttributesSize + msgBodySize; | ||
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold()); | ||
return msgAttributesSize + msgBodySize; | ||
} | ||
|
||
private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttributes) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,7 +61,7 @@ public class AmazonSQSExtendedClientTest { | |
private static final int MORE_THAN_SQS_SIZE_LIMIT = SQS_SIZE_LIMIT + 1; | ||
|
||
// should be > 1 and << SQS_SIZE_LIMIT | ||
private static final int ARBITRATY_SMALLER_THRESSHOLD = 500; | ||
private static final int ARBITRARY_SMALLER_THRESHOLD = 500; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo |
||
|
||
@Before | ||
public void setupClient() { | ||
|
@@ -128,10 +128,10 @@ public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStill | |
|
||
@Test | ||
public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored() { | ||
int messageLength = ARBITRATY_SMALLER_THRESSHOLD * 2; | ||
int messageLength = ARBITRARY_SMALLER_THRESHOLD * 2; | ||
String messageBody = generateStringWithLength(messageLength); | ||
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() | ||
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD); | ||
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRARY_SMALLER_THRESHOLD); | ||
|
||
AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration)); | ||
|
||
|
@@ -158,9 +158,43 @@ public void testReceiveMessageMultipleTimesDoesNotAdditionallyAlterReceiveMessag | |
Assert.assertEquals(expectedRequest, messageRequest); | ||
} | ||
|
||
@Test | ||
public void testWhenSmallMessageBatchIsSentThenNoMessagesStoredInS3() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing test that proves that a perfectly small batch is not moved to s3 |
||
// This creates 10 messages all well within the threshold | ||
|
||
int[] messageLengthForCounter = new int[] { | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000, | ||
1_000 | ||
}; | ||
|
||
List<SendMessageBatchRequestEntry> batchEntries = new ArrayList<SendMessageBatchRequestEntry>(); | ||
for (int i = 0; i < 10; i++) { | ||
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); | ||
int messageLength = messageLengthForCounter[i]; | ||
String messageBody = generateStringWithLength(messageLength); | ||
entry.setMessageBody(messageBody); | ||
entry.setId("entry_" + i); | ||
batchEntries.add(entry); | ||
} | ||
|
||
SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries); | ||
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); | ||
|
||
// There should be no puts | ||
verify(mockS3, never()).putObject(isA(PutObjectRequest.class)); | ||
} | ||
|
||
@Test | ||
public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStoredInS3() { | ||
// This creates 10 messages, out of which only two are below the threshold (100K and 200K), | ||
// This creates 10 messages, out of which only two are below the threshold (100K and 20K), | ||
// and the other 8 are above the threshold | ||
|
||
int[] messageLengthForCounter = new int[] { | ||
|
@@ -172,7 +206,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor | |
700_000, | ||
800_000, | ||
900_000, | ||
200_000, | ||
20_000, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed to make this smaller so that the messages not sent to s3 did not themselves exceed the batch size in this instance. |
||
1000_000 | ||
}; | ||
|
||
|
@@ -189,10 +223,45 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor | |
SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries); | ||
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); | ||
|
||
// There should be 8 puts for the 8 messages above the threshhold | ||
// There should be 8 puts for the 8 messages above the threshold | ||
verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class)); | ||
} | ||
|
||
@Test | ||
public void testWhenMessageBatchIsSentWhereSumOfMessageSizesIsOverTheThresholdThenAllArePutToS3() { | ||
// This creates 10 messages, all of which are below the threshold, but together would make | ||
// a single request over the threshold | ||
|
||
int[] messageLengthForCounter = new int[] { | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_214, | ||
26_219 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small messages which just exceed the batch max |
||
}; | ||
|
||
List<SendMessageBatchRequestEntry> batchEntries = new ArrayList<SendMessageBatchRequestEntry>(); | ||
for (int i = 0; i < 10; i++) { | ||
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); | ||
int messageLength = messageLengthForCounter[i]; | ||
String messageBody = generateStringWithLength(messageLength); | ||
entry.setMessageBody(messageBody); | ||
entry.setId("entry_" + i); | ||
batchEntries.add(entry); | ||
} | ||
|
||
SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries); | ||
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest); | ||
|
||
// There should be 10 puts, as the whole batch is moved to s3 | ||
verify(mockS3, times(10)).putObject(isA(PutObjectRequest.class)); | ||
} | ||
|
||
@Test | ||
public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() { | ||
int messageLength = LESS_THAN_SQS_SIZE_LIMIT; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useful extra
.ignore
s my end.