diff --git a/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java b/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java index d7c16fa..7d4e9cb 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java @@ -9,10 +9,7 @@ import lombok.NoArgsConstructor; import lombok.ToString; import lombok.experimental.SuperBuilder; -import software.amazon.awssdk.retries.api.BackoffStrategy; -import software.amazon.awssdk.retries.api.RetryStrategy; -import software.amazon.awssdk.retries.internal.BaseRetryStrategy; -import software.amazon.awssdk.retries.internal.DefaultStandardRetryStrategy; +import software.amazon.awssdk.awscore.retry.AwsRetryStrategy; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; import software.amazon.awssdk.services.sqs.SqsClient; @@ -53,14 +50,11 @@ protected SqsAsyncClient asyncClient(final RunContext runContext, ); clientBuilder = clientBuilder.overrideConfiguration(builder -> - builder.retryStrategy(DefaultStandardRetryStrategy - .builder() - .maxAttempts(retryMaxAttempts) - .backoffStrategy(BackoffStrategy.exponentialDelay( - RETRY_STRATEGY_BACKOFF_BASE_DELAY, - RETRY_STRATEGY_BACKOFF_MAX_DELAY - )) - .build() + builder.retryStrategy( + AwsRetryStrategy.standardRetryStrategy() + .toBuilder() + .maxAttempts(retryMaxAttempts) + .build() ) ); return clientBuilder.build(); diff --git a/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java b/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java index 5feadaa..96b434d 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java @@ -5,8 +5,12 @@ import io.kestra.plugin.aws.AbstractLocalStackTest; import io.kestra.core.junit.annotations.KestraTest; import jakarta.inject.Inject; +import org.junit.jupiter.api.BeforeEach; import org.testcontainers.containers.localstack.LocalStackContainer; import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; @@ -20,13 +24,24 @@ public class AbstractSqsTest extends AbstractLocalStackTest { @Inject protected StorageInterface storageInterface; + @BeforeEach + void beforeEach() { + try(SqsClient sqsClient = SqsClient + .builder() + .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS)) + .region(Region.of(localstack.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()) + )) + .build()) { + if (!sqsClient.listQueues().queueUrls().contains(queueUrl())) { + sqsClient.createQueue(CreateQueueRequest.builder().queueName("test-queue").build()); + } + } + } + String queueUrl() { return localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString() + "/000000000000/test-queue"; } - void createQueue(SqsClient client) { - if (!client.listQueues().queueUrls().contains(queueUrl())) { - client.createQueue(CreateQueueRequest.builder().queueName("test-queue").build()); - } - } } diff --git a/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java b/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java index 1ef9fb7..b315ef1 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java @@ -2,7 +2,6 @@ import io.kestra.plugin.aws.sqs.model.Message; import io.kestra.plugin.aws.sqs.model.SerdeType; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -11,7 +10,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -@Disabled("Issue with LocalStack, see https://github.com/localstack/localstack/issues/8267") class PublishThenConsumeTest extends AbstractSqsTest { @Test void runText() throws Exception { @@ -31,9 +29,6 @@ void runText() throws Exception { ) .build(); - var client = publish.client(runContext); - createQueue(client); - var publishOutput = publish.run(runContext); assertThat(publishOutput.getMessagesCount(), is(2)); @@ -70,9 +65,6 @@ void runJson() throws Exception { ) .build(); - var client = publish.client(runContext); - createQueue(client); - var publishOutput = publish.run(runContext); assertThat(publishOutput.getMessagesCount(), is(2)); diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index 5d3bc3b..4efc860 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -14,7 +14,6 @@ import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Named; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import reactor.core.publisher.Flux; @@ -28,7 +27,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -@Disabled("Issue with LocalStack, see https://github.com/localstack/localstack/issues/8267") class RealtimeTriggerTest extends AbstractSqsTest { @Inject private ApplicationContext applicationContext; @@ -85,8 +83,6 @@ void flow() throws Exception { .build(); var runContext = runContextFactory.of(); - var client = task.client(runContext); - createQueue(client); task.run(runContext); diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index 24b864a..365a40e 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -14,7 +14,6 @@ import io.micronaut.context.ApplicationContext; import jakarta.inject.Inject; import jakarta.inject.Named; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.localstack.LocalStackContainer; import reactor.core.publisher.Flux; @@ -29,7 +28,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -@Disabled("Issue with LocalStack, see https://github.com/localstack/localstack/issues/8267") class TriggerTest extends AbstractSqsTest { @Inject private ApplicationContext applicationContext; @@ -88,8 +86,6 @@ void flow() throws Exception { .build(); var runContext = runContextFactory.of(); - var client = task.client(runContext); - createQueue(client); task.run(runContext);