From 0ff200b2f126c610735bb7f6affee34d2d8b9aa0 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 12 Jun 2024 13:31:00 +0200 Subject: [PATCH] Trigger cancellation on context close for non-managed objects only Specifically for prototype/scoped beans and FactoryBean-exposed objects. Closes gh-33009 --- .../ScheduledAnnotationBeanPostProcessor.java | 42 +++-- .../annotation/EnableSchedulingTests.java | 176 ++++++++++++++++-- ...duledAnnotationBeanPostProcessorTests.java | 9 +- 3 files changed, 195 insertions(+), 32 deletions(-) diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java index 586d0cf87245..8d3b9d735da0 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java @@ -44,6 +44,7 @@ import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; +import org.springframework.beans.factory.config.SingletonBeanRegistry; import org.springframework.beans.factory.support.MergedBeanDefinitionPostProcessor; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.context.ApplicationContext; @@ -155,6 +156,8 @@ public class ScheduledAnnotationBeanPostProcessor private final Map> reactiveSubscriptions = new IdentityHashMap<>(16); + private final Set manualCancellationOnContextClose = Collections.newSetFromMap(new IdentityHashMap<>(16)); + /** * Create a default {@code ScheduledAnnotationBeanPostProcessor}. @@ -305,6 +308,12 @@ public Object postProcessAfterInitialization(Object bean, String beanName) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } + if ((this.beanFactory != null && !this.beanFactory.isSingleton(beanName)) || + (this.beanFactory instanceof SingletonBeanRegistry sbr && sbr.containsSingleton(beanName))) { + // Either a prototype/scoped bean or a FactoryBean with a pre-existing managed singleton + // -> trigger manual cancellation when ContextClosedEvent comes in + this.manualCancellationOnContextClose.add(bean); + } } } return bean; @@ -595,6 +604,18 @@ public Set getScheduledTasks() { @Override public void postProcessBeforeDestruction(Object bean, String beanName) { + cancelScheduledTasks(bean); + this.manualCancellationOnContextClose.remove(bean); + } + + @Override + public boolean requiresDestruction(Object bean) { + synchronized (this.scheduledTasks) { + return (this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean)); + } + } + + private void cancelScheduledTasks(Object bean) { Set tasks; List liveSubscriptions; synchronized (this.scheduledTasks) { @@ -613,13 +634,6 @@ public void postProcessBeforeDestruction(Object bean, String beanName) { } } - @Override - public boolean requiresDestruction(Object bean) { - synchronized (this.scheduledTasks) { - return (this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean)); - } - } - @Override public void destroy() { synchronized (this.scheduledTasks) { @@ -636,7 +650,10 @@ public void destroy() { liveSubscription.run(); // equivalent to cancelling the subscription } } + this.reactiveSubscriptions.clear(); + this.manualCancellationOnContextClose.clear(); } + this.registrar.destroy(); if (this.localScheduler != null) { this.localScheduler.destroy(); @@ -659,15 +676,10 @@ public void onApplicationEvent(ApplicationContextEvent event) { finishRegistration(); } else if (event instanceof ContextClosedEvent) { - synchronized (this.scheduledTasks) { - Collection> allTasks = this.scheduledTasks.values(); - for (Set tasks : allTasks) { - for (ScheduledTask task : tasks) { - // At this early point, let in-progress tasks complete still - task.cancel(false); - } - } + for (Object bean : this.manualCancellationOnContextClose) { + cancelScheduledTasks(bean); } + this.manualCancellationOnContextClose.clear(); } } } diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java index ace3d2c3b827..0334d066b2e1 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java @@ -21,18 +21,24 @@ import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import jakarta.annotation.PreDestroy; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.core.task.TaskExecutor; import org.springframework.core.testfixture.EnabledForTestGroups; @@ -59,12 +65,20 @@ class EnableSchedulingTests { private AnnotationConfigApplicationContext ctx; + private static final AtomicBoolean shutdownFailure = new AtomicBoolean(); + + + @BeforeEach + void reset() { + shutdownFailure.set(false); + } @AfterEach void tearDown() { if (ctx != null) { ctx.close(); } + assertThat(shutdownFailure).isFalse(); } @@ -75,7 +89,7 @@ void tearDown() { @ParameterizedTest @ValueSource(classes = {FixedRateTaskConfig.class, FixedRateTaskConfigSubclass.class}) @EnabledForTestGroups(LONG_RUNNING) - public void withFixedRateTask(Class configClass) throws InterruptedException { + void withFixedRateTask(Class configClass) throws InterruptedException { ctx = new AnnotationConfigApplicationContext(configClass); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(2); @@ -92,7 +106,7 @@ public void withFixedRateTask(Class configClass) throws InterruptedException @ValueSource(classes = {ExplicitSchedulerConfig.class, ExplicitSchedulerConfigSubclass.class}) @Timeout(2) // should actually complete within 1s @EnabledForTestGroups(LONG_RUNNING) - public void withExplicitScheduler(Class configClass) throws InterruptedException { + void withExplicitScheduler(Class configClass) throws InterruptedException { ctx = new AnnotationConfigApplicationContext(configClass); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); @@ -147,7 +161,7 @@ void withExplicitSchedulerAmbiguity_andSchedulingEnabled() { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withExplicitScheduledTaskRegistrar() throws InterruptedException { + void withExplicitScheduledTaskRegistrar() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(ExplicitScheduledTaskRegistrarConfig.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); @@ -158,7 +172,7 @@ public void withExplicitScheduledTaskRegistrar() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withQualifiedScheduler() throws InterruptedException { + void withQualifiedScheduler() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfig.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); @@ -169,7 +183,7 @@ public void withQualifiedScheduler() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withQualifiedSchedulerAndPlaceholder() throws InterruptedException { + void withQualifiedSchedulerAndPlaceholder() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfigWithPlaceholder.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); @@ -181,7 +195,7 @@ public void withQualifiedSchedulerAndPlaceholder() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withQualifiedSchedulerWithFixedDelayTask() throws InterruptedException { + void withQualifiedSchedulerWithFixedDelayTask() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfigWithFixedDelayTask.class); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); @@ -204,7 +218,7 @@ void withAmbiguousTaskSchedulers_andSingleTask() { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withAmbiguousTaskSchedulers_andSingleTask_disambiguatedByScheduledTaskRegistrarBean() throws InterruptedException { + void withAmbiguousTaskSchedulers_andSingleTask_disambiguatedByScheduledTaskRegistrarBean() throws InterruptedException { ctx = new AnnotationConfigApplicationContext( SchedulingEnabled_withAmbiguousTaskSchedulers_andSingleTask_disambiguatedByScheduledTaskRegistrar.class); @@ -214,7 +228,7 @@ public void withAmbiguousTaskSchedulers_andSingleTask_disambiguatedByScheduledTa @Test @EnabledForTestGroups(LONG_RUNNING) - public void withAmbiguousTaskSchedulers_andSingleTask_disambiguatedBySchedulerNameAttribute() throws InterruptedException { + void withAmbiguousTaskSchedulers_andSingleTask_disambiguatedBySchedulerNameAttribute() throws InterruptedException { ctx = new AnnotationConfigApplicationContext( SchedulingEnabled_withAmbiguousTaskSchedulers_andSingleTask_disambiguatedBySchedulerNameAttribute.class); @@ -224,7 +238,7 @@ public void withAmbiguousTaskSchedulers_andSingleTask_disambiguatedBySchedulerNa @Test @EnabledForTestGroups(LONG_RUNNING) - public void withTaskAddedVia_configureTasks() throws InterruptedException { + void withTaskAddedVia_configureTasks() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(SchedulingEnabled_withTaskAddedVia_configureTasks.class); Thread.sleep(110); @@ -233,7 +247,7 @@ public void withTaskAddedVia_configureTasks() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withInitiallyDelayedFixedRateTask() throws InterruptedException { + void withInitiallyDelayedFixedRateTask() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(FixedRateTaskConfig_withInitialDelay.class); Thread.sleep(1950); @@ -246,7 +260,7 @@ public void withInitiallyDelayedFixedRateTask() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withInitiallyDelayedFixedDelayTask() throws InterruptedException { + void withInitiallyDelayedFixedDelayTask() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(FixedDelayTaskConfig_withInitialDelay.class); Thread.sleep(1950); @@ -259,7 +273,35 @@ public void withInitiallyDelayedFixedDelayTask() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withOneTimeTask() throws InterruptedException { + void withPrototypeContainedFixedDelayTask() throws InterruptedException { + ctx = new AnnotationConfigApplicationContext(FixedDelayTaskConfig_withPrototypeBean.class); + + ctx.getBean(PrototypeBeanWithScheduled.class); + Thread.sleep(1950); + AtomicInteger counter = ctx.getBean(AtomicInteger.class); + + // The @Scheduled method should have been called several times + // but not more times than the delay allows. + assertThat(counter.get()).isBetween(1, 5); + } + + @Test + @EnabledForTestGroups(LONG_RUNNING) + void withPrototypeFactoryContainedFixedDelayTask() throws InterruptedException { + ctx = new AnnotationConfigApplicationContext(FixedDelayTaskConfig_withFactoryBean.class); + + ctx.getBean(PrototypeBeanWithScheduled.class); + Thread.sleep(1950); + AtomicInteger counter = ctx.getBean(AtomicInteger.class); + + // The @Scheduled method should have been called several times + // but not more times than the delay allows. + assertThat(counter.get()).isBetween(1, 5); + } + + @Test + @EnabledForTestGroups(LONG_RUNNING) + void withOneTimeTask() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(OneTimeTaskConfig.class); Thread.sleep(110); @@ -271,7 +313,7 @@ public void withOneTimeTask() throws InterruptedException { @Test @EnabledForTestGroups(LONG_RUNNING) - public void withTriggerTask() throws InterruptedException { + void withTriggerTask() throws InterruptedException { ctx = new AnnotationConfigApplicationContext(TriggerTaskConfig.class); Thread.sleep(110); @@ -677,6 +719,9 @@ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { @EnableScheduling static class FixedRateTaskConfig_withInitialDelay { + @Autowired + ScheduledAnnotationBeanPostProcessor bpp; + @Bean public AtomicInteger counter() { return new AtomicInteger(); @@ -687,6 +732,13 @@ public void task() throws InterruptedException { counter().incrementAndGet(); Thread.sleep(100); } + + @PreDestroy + public void validateLateCancellation() { + if (this.bpp.getScheduledTasks().isEmpty()) { + shutdownFailure.set(true); + } + } } @@ -694,6 +746,9 @@ public void task() throws InterruptedException { @EnableScheduling static class FixedDelayTaskConfig_withInitialDelay { + @Autowired + ScheduledAnnotationBeanPostProcessor bpp; + @Bean public AtomicInteger counter() { return new AtomicInteger(); @@ -704,6 +759,101 @@ public void task() throws InterruptedException { counter().incrementAndGet(); Thread.sleep(100); } + + @PreDestroy + public void validateLateCancellation() { + if (this.bpp.getScheduledTasks().isEmpty()) { + shutdownFailure.set(true); + } + } + } + + + @Configuration + @EnableScheduling + static class FixedDelayTaskConfig_withPrototypeBean { + + @Autowired + ScheduledAnnotationBeanPostProcessor bpp; + + @Bean + public AtomicInteger counter() { + return new AtomicInteger(); + } + + @Bean @Scope("prototype") + public PrototypeBeanWithScheduled prototypeBean() { + return new PrototypeBeanWithScheduled(counter()); + } + + @PreDestroy + public void validateEarlyCancellation() { + if (!this.bpp.getScheduledTasks().isEmpty()) { + shutdownFailure.set(true); + } + } + } + + + @Configuration + @EnableScheduling + static class FixedDelayTaskConfig_withFactoryBean { + + @Autowired + ScheduledAnnotationBeanPostProcessor bpp; + + @Bean + public AtomicInteger counter() { + return new AtomicInteger(); + } + + @Bean + public FactoryBeanForScheduled prototypeBean() { + return new FactoryBeanForScheduled(counter()); + } + + @PreDestroy + public void validateEarlyCancellation() { + if (!this.bpp.getScheduledTasks().isEmpty()) { + shutdownFailure.set(true); + } + } + } + + + static class PrototypeBeanWithScheduled { + + private AtomicInteger counter; + + public PrototypeBeanWithScheduled(AtomicInteger counter) { + this.counter = counter; + } + + @Scheduled(initialDelay = 1000, fixedDelay = 100) + public void task() throws InterruptedException { + this.counter.incrementAndGet(); + Thread.sleep(100); + } + } + + + static class FactoryBeanForScheduled implements FactoryBean { + + private AtomicInteger counter; + + public FactoryBeanForScheduled(AtomicInteger counter) { + this.counter = counter; + } + + @Override + public PrototypeBeanWithScheduled getObject() { + return new PrototypeBeanWithScheduled(this.counter); + } + + @Override + public Class getObjectType() { + return PrototypeBeanWithScheduled.class; + } } diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java index 07ee096636a9..65626d56e1fb 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -352,9 +352,6 @@ void cronTaskWithZone() { assertThat(condition).isTrue(); CronTrigger cronTrigger = (CronTrigger) trigger; ZonedDateTime dateTime = ZonedDateTime.of(2013, 4, 15, 4, 0, 0, 0, ZoneId.of("GMT+10")); -// Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("GMT+10")); -// cal.clear(); -// cal.set(2013, 3, 15, 4, 0); // 15-04-2013 4:00 GMT+10; Instant lastScheduledExecution = dateTime.toInstant(); Instant lastActualExecution = dateTime.toInstant(); dateTime = dateTime.plusMinutes(30); @@ -1026,6 +1023,7 @@ void fixedDelay() { } } + static class PropertyPlaceholderWithFixedDelayInSeconds { @Scheduled(fixedDelayString = "${fixedDelay}", initialDelayString = "${initialDelay}", timeUnit = TimeUnit.SECONDS) @@ -1041,6 +1039,7 @@ void fixedRate() { } } + static class PropertyPlaceholderWithFixedRateInSeconds { @Scheduled(fixedRateString = "${fixedRate}", initialDelayString = "${initialDelay}", timeUnit = TimeUnit.SECONDS) @@ -1071,9 +1070,11 @@ void y() { } } + @Retention(RetentionPolicy.RUNTIME) @ConvertWith(NameToClass.Converter.class) private @interface NameToClass { + class Converter implements ArgumentConverter { @Override public Class convert(Object beanClassName, ParameterContext context) throws ArgumentConversionException {