Skip to content

Commit

Permalink
Add acknowledgement mode on @SqsListener annotation (#870)
Browse files Browse the repository at this point in the history
* Add ack mode parameter to SqsListener annotation

Fixes gh-761

* Remove unrelated file from PR

* Fix style differences

* Rename acknowledgement enum to be consistent with other nomenclatures

* Update docs with @SqsListener acknowledgement mode param

* Add JavaDoc for method and test enum value

- Added missing JavaDoc for getAcknowledgementMode public method
- Added test to ensure that new SqsListenerAcknowledgementMode enum will
  have the same values as the original AcknowledgementMode enum

* Change annotation ack from enum to string

- Change SqsListenerAcknowledgementMode from enum to class with const
  strings
- Change typo on "acknowledgement" annotation field
- Remove unnecessary field on SqsContainerOptionsBuilder
- Move integration tests to its own test suite

* Update documentation and code comments

- Update docs to reflect new "DEFAULT" annotation ack mode
- Update comments with latest changes

* Set empty string as default behavior

- Remove "DEFAULT" annotation acknowledgement mode, use empty string as
  default
- Update docs

* Add author after latest changes

- Add author on modified classes

* Fix inverted condition when resolving AcknowledgementMode value
  • Loading branch information
João Victor Calassio authored Nov 4, 2023
1 parent f7eb0ab commit 273f8f9
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 16 deletions.
6 changes: 5 additions & 1 deletion docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ Note that if there are messages available the call may return earlier than this
- `messageVisibilitySeconds` - Set the minimum visibility for the messages retrieved in a poll.
Note that for `FIFO` single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener.
See <<FIFO Support>> for more information.

- `acknowledgementMode` - Set the acknowledgement mode for the container.
If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options.
See <<Acknowledgement Mode>> for more information.

===== Listener Method Arguments

Expand Down Expand Up @@ -1326,6 +1328,8 @@ NOTE: All options are available for both `single message` and `batch` message li
- `ALWAYS` - Acknowledges a message or batch of messages after processing returns success or error.
- `MANUAL` - The framework won't acknowledge messages automatically and `Acknowledgement` objects can be received in the listener method.

The `Acknowledgement` strategy can be configured in the `SqsContainerOptions` or in the `@SqsListener` annotation.

==== Acknowledgement Batching

The `acknowledgementInterval` and `acknowledgementThreshold` options enable acknowledgement batching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,10 @@
import io.awspring.cloud.sqs.config.HandlerMethodEndpoint;
import io.awspring.cloud.sqs.config.SqsEndpoint;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import io.awspring.cloud.sqs.support.resolver.AcknowledgmentHandlerMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.BatchAcknowledgmentArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.BatchPayloadMethodArgumentResolver;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -69,11 +57,26 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* {@link BeanPostProcessor} implementation that scans beans for a {@link SqsListener @SqsListener} annotation, extracts
* information to a {@link SqsEndpoint}, and registers it in the {@link EndpointRegistrar}.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
*/
public abstract class AbstractListenerAnnotationBeanPostProcessor<A extends Annotation>
Expand Down Expand Up @@ -219,6 +222,17 @@ protected Integer resolveAsInteger(String value, String propertyName) {
}
}

@Nullable
protected AcknowledgementMode resolveAcknowledgement(String value) {
try {
final String resolvedValue = resolveAsString(value, "acknowledgementMode");
return StringUtils.hasText(resolvedValue) ? AcknowledgementMode.valueOf(resolvedValue) : null;
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Cannot resolve " + value + " as AcknowledgementMode", e);
}
}

protected String getEndpointId(String id) {
if (StringUtils.hasText(id)) {
return resolveAsString(id, "id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* @author Alain Sahli
* @author Matej Nedic
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 1.1
*/
@Target(ElementType.METHOD)
Expand Down Expand Up @@ -137,4 +138,10 @@
*/
String messageVisibilitySeconds() default "";

/**
* The acknowledgement mode to be used for the provided queues. If not specified, the acknowledgement mode defined
* for the container factory will be used.
*/
String acknowledgementMode() default "";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AlwaysAcknowledgementHandler;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.NeverAcknowledgementHandler;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.OnSuccessAcknowledgementHandler;

/**
* Acknowledgement strategies supported by the {@link SqsListener} annotation.
*
* @author Joao Calassio
* @since 3.1
* @see OnSuccessAcknowledgementHandler
* @see AlwaysAcknowledgementHandler
* @see NeverAcknowledgementHandler
* @see io.awspring.cloud.sqs.listener.ContainerOptions
* @see SqsListener
*/
public class SqsListenerAcknowledgementMode {

/**
* Messages will be acknowledged when message processing is successful.
*/
public static final String ON_SUCCESS = "ON_SUCCESS";

/**
* Messages will be acknowledged whether processing was completed successfully or with an error.
*/
public static final String ALWAYS = "ALWAYS";

/**
* Messages will not be acknowledged automatically by the container.
* @see io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement
*/
public static final String MANUAL = "MANUAL";

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* {@link AbstractListenerAnnotationBeanPostProcessor} implementation for {@link SqsListener @SqsListener}.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
*/
public class SqsListenerAnnotationBeanPostProcessor extends AbstractListenerAnnotationBeanPostProcessor<SqsListener> {
Expand All @@ -51,7 +52,7 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages"))
.messageVisibility(
resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
.build();
.acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.awspring.cloud.sqs.config;

import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import java.time.Duration;
import java.util.Collection;
import org.springframework.lang.Nullable;
Expand All @@ -26,6 +27,7 @@
* Contains properties that should be mapped from {@link SqsListener @SqsListener} annotations.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
*/
public class SqsEndpoint extends AbstractEndpoint {
Expand All @@ -38,12 +40,16 @@ public class SqsEndpoint extends AbstractEndpoint {

private final Integer maxMessagesPerPoll;

@Nullable
private final AcknowledgementMode acknowledgementMode;

protected SqsEndpoint(SqsEndpointBuilder builder) {
super(builder.queueNames, builder.factoryName, builder.id);
this.maxConcurrentMessages = builder.maxConcurrentMessages;
this.pollTimeoutSeconds = builder.pollTimeoutSeconds;
this.messageVisibility = builder.messageVisibility;
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.acknowledgementMode = builder.acknowledgementMode;
}

/**
Expand Down Expand Up @@ -91,6 +97,15 @@ public Duration getMessageVisibility() {
return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null;
}

/**
* Returns the acknowledgement mode configured for this endpoint.
* @return the acknowledgement mode.
*/
@Nullable
public AcknowledgementMode getAcknowledgementMode() {
return this.acknowledgementMode;
}

public static class SqsEndpointBuilder {

private Collection<String> queueNames;
Expand All @@ -107,6 +122,9 @@ public static class SqsEndpointBuilder {

private Integer maxMessagesPerPoll;

@Nullable
private AcknowledgementMode acknowledgementMode;

public SqsEndpointBuilder queueNames(Collection<String> queueNames) {
this.queueNames = queueNames;
return this;
Expand Down Expand Up @@ -142,6 +160,11 @@ public SqsEndpointBuilder id(String id) {
return this;
}

public SqsEndpointBuilder acknowledgementMode(@Nullable AcknowledgementMode acknowledgementMode) {
this.acknowledgementMode = acknowledgementMode;
return this;
}

public SqsEndpoint build() {
return new SqsEndpoint(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
* used.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
* @see SqsMessageListenerContainer
* @see ContainerOptions
Expand Down Expand Up @@ -161,7 +162,8 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio
ConfigUtils.INSTANCE.acceptIfNotNull(sqsEndpoint.getMaxConcurrentMessages(), options::maxConcurrentMessages)
.acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll)
.acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout)
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility);
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility)
.acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import java.lang.reflect.Field;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/**
* Tests for {@link SqsListenerAcknowledgementMode} enum values
*
* @author Joao Calassio
*/
class SqsListenerAcknowledgementModeTests {

@ParameterizedTest
@EnumSource(AcknowledgementMode.class)
void shouldHaveAllValuesOfAcknowledgementModeEnum(final AcknowledgementMode acknowledgementMode)
throws NoSuchFieldException, IllegalAccessException {
Class<SqsListenerAcknowledgementMode> clz = SqsListenerAcknowledgementMode.class;
Field correspondingValue = clz.getDeclaredField(acknowledgementMode.name());
assertEquals(acknowledgementMode.name(), correspondingValue.get(clz));
}

}
Loading

0 comments on commit 273f8f9

Please sign in to comment.