Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acknowledgement failed when the application graceful shutdown #925

Closed
XUmeng96 opened this issue Oct 27, 2023 · 9 comments
Closed

Acknowledgement failed when the application graceful shutdown #925

XUmeng96 opened this issue Oct 27, 2023 · 9 comments
Labels
component: sqs SQS integration related issue type: bug Something isn't working

Comments

@XUmeng96
Copy link

Type: Bug

Component:
SQS

Describe the bug
Version:

JDK:17
spring-cloud-aws-dependencies : 3.0.2
spring-cloud-dependencies:2022.0.4
amazon-sqs-java-messaging-lib:2.1.1
spring-boot-dependencies : 3.1.4
amazon.awssdk:2.21.0

Error:
I use a @SqsListener and a MessageListenerContainerFactory bean to create the MessageListenerContainer. batching acknoeledge messages.
when I shut down the application, I got some error message, or maybe no error but the messages still inflight until the visibility timeout instead of acknoledged.
I hope the acknoledgement can be finished even if the shutdownhook triggered.
I saw doStop() method in BatchingAcknowledgementProcessor . It only waiting for runningAcks until the acknowledgementShutdownTimeout. If there are some messages in BlockingQueue acks and not in Map<String, BlockingQueue<Message>> acksBuffer, these messages will not be acknowlwdged, right?
I'm not sure why we only waiting for runningAcks to finish and Is there a way to ensure that all messages being processed are acknowledged?

Sample
MessageListenerContainerFactory config as following:

@Bean("sqsFactory")
SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {

       return SqsMessageListenerContainerFactory.builder().configure(
                       options -> options.acknowledgementMode(AcknowledgementMode.MANUAL)
                               .listenerMode(ListenerMode.SINGLE_MESSAGE)
                               .maxConcurrentMessages(100)
                               .maxMessagesPerPoll(100)
                               .listenerShutdownTimeout(Duration.of(25L, ChronoUnit.SECONDS))
                               .acknowledgementShutdownTimeout(Duration.of(20L, ChronoUnit.SECONDS))
                               .acknowledgementThreshold(5)
                               .acknowledgementInterval(Duration.of(50, ChronoUnit.MILLIS))
                               .queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
               ).sqsAsyncClient(sqsAsyncClient)
               .build();
   }

and I simulate the business logic by sleeping 15 seconds.

 @SqsListener(value = "webhook-performance-test-sqs", factory = "sqsFactory")
    public void queueListener(@Payload String sqsBatchMessage, Acknowledgement acknowledgement) {
  
            try {
            Thread.sleep(15000L);
             log.info("business process finished....");
        } catch (Exception ex) {
            log.error("Failed.", ex);
        } finally {
            acknowledgment.acknowledgeAsync();
            log.info("acknowledgment finish  ....");

}
@maciejwalkowiak maciejwalkowiak added the component: sqs SQS integration related issue label Nov 6, 2023
@tomazfernandes
Copy link
Contributor

Hey @XUmeng96, sorry for the delay.

You're right in that there's a racing condition there where the ack processor may stop before all acks are added to the queue. Maybe it's worth it adding logic to also wait for the queue to be empty.

Nevertheless, considering we have a dedicated thread that pretty much just waits on this queue and adds messages to the buffer, I'd think more often than not we wouldn't have this issue.

What you are seeing seems much more like an effect of this logic where we ignore new messages for acknowledgement after we receive the order to stop:

public CompletableFuture<Void> onAcknowledge(Message<T> message) {
if (!isRunning()) {
logger.debug("{} not running, returning for message {}", this.id, MessageHeaderUtils.getId(message));
return CompletableFuture.completedFuture(null);
}
logger.trace("Received message {} to acknowledge.", MessageHeaderUtils.getId(message));
return doOnAcknowledge(message);
}

Also, keep in mind that the message source only signals the acknowledgement processor to stop after all messages have completed processor - or shutdownTimeout if it comes first.

public void stop() {
if (!isRunning()) {
logger.debug("{} for queue {} not running", getClass().getSimpleName(), this.pollingEndpointName);
}
synchronized (this.lifecycleMonitor) {
logger.debug("Stopping {} for queue {}", getClass().getSimpleName(), this.pollingEndpointName);
this.running = false;
if (!waitExistingTasksToFinish()) {
logger.warn("Tasks did not finish in {} seconds for queue {}, proceeding with shutdown",
this.shutdownTimeout.getSeconds(), this.pollingEndpointName);
this.pollingFutures.forEach(pollingFuture -> pollingFuture.cancel(true));
}
doStop();
this.acknowledgmentProcessor.stop();
logger.debug("{} for queue {} stopped", getClass().getSimpleName(), this.pollingEndpointName);
}
}

You can set your log level to TRACE on these classes to have a clear picture of what exactly happened to a specific message.

Overall, batching acks offers weak guarantees on that all messages will necessarily be acknowledged - acknowledgement might fail for example.

If you need strong guarantees, you should probably use the ImmediateAcknowledgement processor to acknowledge messages right after they're processed, and also add an AcknowledgementResultCallback to act on failures.

Does this make sense to you? Thanks.

@tomazfernandes tomazfernandes added the status: waiting-for-feedback Waiting for feedback from issuer label Jan 5, 2024
@MarcelLorke6666
Copy link

MarcelLorke6666 commented Jan 26, 2024

I'm also experiencing this issue now where a message is added via BatchingAcknowledgementProcessor#doOnAcknowledge, but the application is already in the process of shutting down, so the message is never moved from acks to acksBuffer. This however means that even though we still wait 10 seconds for the acknowledgements to finish the entries in acks will never be acknowledged anymore.

In my opinion it would make sense to try to acknowledge all events hat have been received as I don't see why we can only have a weak guarantee here. Also, if the defaults allow a situation where a message is processed successfully on the application side, but is not necessarily acknowledged in SQS then either the documentation should be updated to reflect that important information (imho) or the default should ensure a strong guarantee

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jan 28, 2024

Hey @MarcelLorke6666, thanks for your report.

I should have time to look into this this week.

as I don't see why we can only have a weak guarantee here.

Standard SQS queues provide an at-least-once guarantee, which the framework respects. So while fixing this racing condition might minimize duplicate processing at application shutdown, it would not change the guarantee itself.

or the default should ensure a strong guarantee

FIFO queues have strong guarantees - it guarantees that a message will be served exactly once and in order within a timeframe if both Producer and Consumer are correctly configured. The framework also respects that by defaulting to IMMEDIATE (Sync) acknowledgement for FIFO queues and gives the user tools to handle situations where a failure happens in acknowledgement to avoid duplicate / out-of-order processing.

Defaulting Standard SQS queues to IMMEDIATE acknowledgement though would incur in a significant performance and cost hit, while doing really nothing for the user in terms of guarantees - though users can configure that if they so wish.

@mlork
Copy link

mlork commented Jan 29, 2024

Thank you for taking the time!

You are right and generally we also have measures in place. In our specific scenario the deduplication logic already evicted this event and we were surprised to see it again a few hours later as from the perspective of the application everything went just fine.

@mgrundie-r7
Copy link

Thanks for the info in previous comments.

We are also experiencing this issue. Messages are being received multiple times (max receive == 2) and completing successfully but never being acknowledged. This results in the maxReceiveCount being hit and messages go to the dlq for our service which triggers false alerts for service issues. This is actually quite likely to happen in k8s environments where the scheduler frequently stops pods and your message is unlucky enough to be received each time by a pod that is about to be shutdown.

Standard SQS queues provide an at-least-once guarantee, which the framework respects. So while fixing this racing condition might minimize duplicate processing at application shutdown, it would not change the guarantee itself.

You could argue that since this issue breaks message acknowledgements then the at-least-once guarantee has been broken

We are going to try the immediate acknowledge as a workaround for now.

@tomazfernandes
Copy link
Contributor

Thanks @mgrundie-r7, let us know how that works for you.

I'm looking into another ack issue and should be able to look into this this one this weekend or so.

It's a good thing that with increased usage issues are surfacing so we can fix them.

We'll get there :)

@tomazfernandes tomazfernandes added type: bug Something isn't working and removed status: waiting-for-feedback Waiting for feedback from issuer labels Feb 7, 2024
@tomazfernandes
Copy link
Contributor

Hey everyone, I haven't had the time to work on this yet.

The fix itself doesn't look too complex, but it should be well tested so we don't have a regression later.

If anyone would like to contribute a PR for this I'd be happy to review and merge it.

Thanks.

@tomazfernandes
Copy link
Contributor

Hey everyone, finally had the time for this fix. There are no other acknowledgement problems on the radar, so hopefully all is well now 🙌🏼

Here's the PR with the fix

Reviews are appreciated, and feel free to checkout the branch and test it if you'd like.

Thanks!

@tomazfernandes
Copy link
Contributor

Fixed in #1082.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants