Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent a batch of messages going over the threshold #37

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
target/
.idea/
*.iml
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful extra .ignores my end.

Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.Map.Entry;

import com.amazonaws.AmazonClientException;
Expand Down Expand Up @@ -775,15 +773,41 @@ public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessa

List<SendMessageBatchRequestEntry> batchEntries = sendMessageBatchRequest.getEntries();

List<Integer> smallMessages = new LinkedList<>();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this is a two pass process. Move the obviously big ones to s3, catching the indices of those we didn't move.

long totalSize = moveIndividuallyLargeMessagesToS3(batchEntries, smallMessages);

if (isLarge(totalSize) && !smallMessages.isEmpty()) {
moveRemainingMessagesToS3(batchEntries, smallMessages);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If necessary, then we move the remainder


return super.sendMessageBatch(sendMessageBatchRequest);
}

private void moveRemainingMessagesToS3(List<SendMessageBatchRequestEntry> batchEntries, List<Integer> smallMessages) {
for (Integer smallMessageIndex : smallMessages) {
batchEntries.set(smallMessageIndex, storeMessageInS3(batchEntries.get(smallMessageIndex)));
}
}

private long moveIndividuallyLargeMessagesToS3(List<SendMessageBatchRequestEntry> batchEntries,
List<Integer> smallMessages) {
int index = 0;
long totalSize = 0;
for (SendMessageBatchRequestEntry entry : batchEntries) {
if (clientConfiguration.isAlwaysThroughS3() || isLarge(entry)) {
batchEntries.set(index, storeMessageInS3(entry));
long entrySize = sizeOf(entry);

if (clientConfiguration.isAlwaysThroughS3() || isLarge(entrySize)) {
SendMessageBatchRequestEntry storedVersion = storeMessageInS3(entry);
batchEntries.set(index, storedVersion);
totalSize += sizeOf(storedVersion);
} else {
totalSize += entrySize;
smallMessages.add(index);
}

++index;
}

return super.sendMessageBatch(sendMessageBatchRequest);
return totalSize;
}

/**
Expand Down Expand Up @@ -1215,14 +1239,17 @@ private boolean isLarge(SendMessageRequest sendMessageRequest) {
int msgAttributesSize = getMsgAttributesSize(sendMessageRequest.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(sendMessageRequest.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
return isLarge(totalMsgSize);
}

private boolean isLarge(SendMessageBatchRequestEntry batchEntry) {
private boolean isLarge(long size) {
return (size > clientConfiguration.getMessageSizeThreshold());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reused isLarge moved into its own function.

}

private long sizeOf(SendMessageBatchRequestEntry batchEntry) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizeOf means we can work out the sums of batch members.

int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
return msgAttributesSize + msgBodySize;
}

private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class AmazonSQSExtendedClientTest {
private static final int MORE_THAN_SQS_SIZE_LIMIT = SQS_SIZE_LIMIT + 1;

// should be > 1 and << SQS_SIZE_LIMIT
private static final int ARBITRATY_SMALLER_THRESSHOLD = 500;
private static final int ARBITRARY_SMALLER_THRESHOLD = 500;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo


@Before
public void setupClient() {
Expand Down Expand Up @@ -128,10 +128,10 @@ public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStill

@Test
public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored() {
int messageLength = ARBITRATY_SMALLER_THRESSHOLD * 2;
int messageLength = ARBITRARY_SMALLER_THRESHOLD * 2;
String messageBody = generateStringWithLength(messageLength);
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD);
.withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRARY_SMALLER_THRESHOLD);

AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration));

Expand All @@ -158,9 +158,43 @@ public void testReceiveMessageMultipleTimesDoesNotAdditionallyAlterReceiveMessag
Assert.assertEquals(expectedRequest, messageRequest);
}

@Test
public void testWhenSmallMessageBatchIsSentThenNoMessagesStoredInS3() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test that proves that a perfectly small batch is not moved to s3

// This creates 10 messages all well within the threshold

int[] messageLengthForCounter = new int[] {
1_000,
1_000,
1_000,
1_000,
1_000,
1_000,
1_000,
1_000,
1_000,
1_000
};

List<SendMessageBatchRequestEntry> batchEntries = new ArrayList<SendMessageBatchRequestEntry>();
for (int i = 0; i < 10; i++) {
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
int messageLength = messageLengthForCounter[i];
String messageBody = generateStringWithLength(messageLength);
entry.setMessageBody(messageBody);
entry.setId("entry_" + i);
batchEntries.add(entry);
}

SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries);
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest);

// There should be no puts
verify(mockS3, never()).putObject(isA(PutObjectRequest.class));
}

@Test
public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStoredInS3() {
// This creates 10 messages, out of which only two are below the threshold (100K and 200K),
// This creates 10 messages, out of which only two are below the threshold (100K and 20K),
// and the other 8 are above the threshold

int[] messageLengthForCounter = new int[] {
Expand All @@ -172,7 +206,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor
700_000,
800_000,
900_000,
200_000,
20_000,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to make this smaller so that the messages not sent to s3 did not themselves exceed the batch size in this instance.

1000_000
};

Expand All @@ -189,10 +223,45 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor
SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries);
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest);

// There should be 8 puts for the 8 messages above the threshhold
// There should be 8 puts for the 8 messages above the threshold
verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class));
}

@Test
public void testWhenMessageBatchIsSentWhereSumOfMessageSizesIsOverTheThresholdThenAllArePutToS3() {
// This creates 10 messages, all of which are below the threshold, but together would make
// a single request over the threshold

int[] messageLengthForCounter = new int[] {
26_214,
26_214,
26_214,
26_214,
26_214,
26_214,
26_214,
26_214,
26_214,
26_219
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small messages which just exceed the batch max

};

List<SendMessageBatchRequestEntry> batchEntries = new ArrayList<SendMessageBatchRequestEntry>();
for (int i = 0; i < 10; i++) {
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
int messageLength = messageLengthForCounter[i];
String messageBody = generateStringWithLength(messageLength);
entry.setMessageBody(messageBody);
entry.setId("entry_" + i);
batchEntries.add(entry);
}

SendMessageBatchRequest batchRequest = new SendMessageBatchRequest(SQS_QUEUE_URL, batchEntries);
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest);

// There should be 10 puts, as the whole batch is moved to s3
verify(mockS3, times(10)).putObject(isA(PutObjectRequest.class));
}

@Test
public void testWhenSmallMessageIsSentThenNoAttributeIsAdded() {
int messageLength = LESS_THAN_SQS_SIZE_LIMIT;
Expand Down