diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c3172d7..5e21781 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -19,7 +19,7 @@ jobs: name: Build & push Docker image with: image: kafka-retry-job - tags: 1.12.2, latest + tags: 1.12.3, latest registry: ghcr.io username: ${{ secrets.GHCR_USERNAME }} password: ${{ secrets.GHCR_TOKEN }} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d192d1e..2a3db68 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,6 @@ variables: - VERSION: "1.12.2" + VERSION: "1.12.3" DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 125cd58..5e85107 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,5 +1,14 @@ # Changelog +## [1.12.3](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) + +- [\#15](https://github.com/Trendyol/kafka-retry-job/issues/15) Fix integration tests +- [\#35](https://github.com/Trendyol/kafka-retry-job/issues/35) Use Builder pattern to read configurations + +**Merged pull requests:** + +- Pull Request for the issues #15 [\#35](https://github.com/Trendyol/kafka-retry-job/pull/38) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz)) + ## [1.12.2](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03) **Closed issues:** diff --git a/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs b/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs new file mode 100644 index 0000000..64b1223 --- /dev/null +++ b/src/Helpers/KafkaConfigs/ClientConfigBuilder.cs @@ -0,0 +1,96 @@ +using Confluent.Kafka; + +namespace KafkaRetry.Job.Helpers.KafkaConfigs; + +public static class ClientConfigBuilder +{ + public static ClientConfig WithBootstrapServers(this ClientConfig config, string bootstrapServers) + { + if (!string.IsNullOrWhiteSpace(bootstrapServers)) + { + config.BootstrapServers = bootstrapServers; + } + return config; + } + + public static ClientConfig WithSaslUsername(this ClientConfig config, string username) + { + if (!string.IsNullOrWhiteSpace(username)) + { + config.SaslUsername = username; + } + return config; + } + + public static ClientConfig WithSaslPassword(this ClientConfig config, string password) + { + if (!string.IsNullOrWhiteSpace(password)) + { + config.SaslPassword = password; + } + return config; + } + + public static ClientConfig WithSslCaLocation(this ClientConfig config, string sslCaLocation) + { + if (!string.IsNullOrWhiteSpace(sslCaLocation)) + { + config.SslCaLocation = sslCaLocation; + } + return config; + } + + public static ClientConfig WithSaslMechanism(this ClientConfig config, SaslMechanism? saslMechanism) + { + if (saslMechanism is not null) + { + config.SaslMechanism = saslMechanism; + } + return config; + } + + public static ClientConfig WithSecurityProtocol(this ClientConfig config, SecurityProtocol? securityProtocol) + { + if (securityProtocol is not null) + { + config.SecurityProtocol = securityProtocol; + } + return config; + } + + public static ClientConfig WithSslKeystorePassword(this ClientConfig config, string sslKeystorePassword) + { + if (!string.IsNullOrWhiteSpace(sslKeystorePassword)) + { + config.SslKeystorePassword = sslKeystorePassword; + } + return config; + } + + public static ClientConfig WithClientId(this ClientConfig config, string clientId) + { + if (!string.IsNullOrWhiteSpace(clientId)) + { + config.ClientId = clientId; + } + return config; + } + + public static ClientConfig WithMessageMaxBytes(this ClientConfig config, int? messageMaxBytes) + { + if (messageMaxBytes is not null) + { + config.MessageMaxBytes = messageMaxBytes; + } + return config; + } + + public static ClientConfig WithAcks(this ClientConfig config, Acks? acks) + { + if (acks is not null) + { + config.Acks = acks; + } + return config; + } +} \ No newline at end of file diff --git a/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs b/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs new file mode 100644 index 0000000..6bc12a4 --- /dev/null +++ b/src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs @@ -0,0 +1,42 @@ +using Confluent.Kafka; + +namespace KafkaRetry.Job.Helpers.KafkaConfigs; + +public static class ConsumerConfigBuilder +{ + public static ConsumerConfig WithAutoOffsetReset(this ConsumerConfig config, AutoOffsetReset? autoOffsetReset) + { + if (autoOffsetReset is not null) + { + config.AutoOffsetReset = autoOffsetReset; + } + return config; + } + + public static ConsumerConfig WithGroupId(this ConsumerConfig config, string groupId) + { + if (!string.IsNullOrWhiteSpace(groupId)) + { + config.GroupId = groupId; + } + return config; + } + + public static ConsumerConfig WithEnableAutoCommit(this ConsumerConfig config, bool? autoCommit) + { + if (autoCommit is not null) + { + config.EnableAutoCommit = autoCommit; + } + return config; + } + + public static ConsumerConfig WithEnableAutoOffsetStore(this ConsumerConfig config, bool? autoOffsetStore) + { + if (autoOffsetStore is not null) + { + config.EnableAutoOffsetStore = autoOffsetStore; + } + return config; + } +} \ No newline at end of file diff --git a/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs b/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs new file mode 100644 index 0000000..c977f8d --- /dev/null +++ b/src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs @@ -0,0 +1,48 @@ +using Confluent.Kafka; + +namespace KafkaRetry.Job.Helpers.KafkaConfigs; + +public static class ProducerConfigBuilder +{ + public static ProducerConfig WithEnableIdempotence(this ProducerConfig config, bool? idempotence) + { + if (idempotence is not null) + { + config.EnableIdempotence = idempotence; + } + return config; + } + + public static ProducerConfig WithBatchSize(this ProducerConfig config, int? batchSize) + { + if (batchSize is not null) + { + config.BatchSize = batchSize; + } + return config; + } + + public static ProducerConfig WithLingerMs(this ProducerConfig config, double? lingerMs) + { + if (lingerMs is not null) + { + config.LingerMs = lingerMs; + } + return config; + } + + public static ProducerConfig WithMessageTimeoutMs(this ProducerConfig config, int? messageTimeoutMs) + { + if (messageTimeoutMs is not null) + { + config.MessageTimeoutMs = messageTimeoutMs; + } + return config; + } + + public static ProducerConfig WithRequestTimeoutMs(this ProducerConfig config, int? requestTimeoutMs) + { + config.RequestTimeoutMs = requestTimeoutMs; + return config; + } +} \ No newline at end of file diff --git a/src/Services/Implementations/ConfigurationService.cs b/src/Services/Implementations/ConfigurationService.cs index 4ddeaf7..713dc5e 100644 --- a/src/Services/Implementations/ConfigurationService.cs +++ b/src/Services/Implementations/ConfigurationService.cs @@ -21,11 +21,10 @@ public ConfigurationService(IConfiguration configuration) public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix"); public string RetryTopicNameInHeader => GetValue("RetryTopicNameInHeader"); - public long MessageConsumeLimitPerTopicPartition => - GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; + public long MessageConsumeLimitPerTopicPartition => GetValue("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue; - public bool EnableAutoCommit => GetValue("EnableAutoCommit") ?? false; - public bool EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore") ?? false; + public bool? EnableAutoCommit => GetValue("EnableAutoCommit"); + public bool? EnableAutoOffsetStore => GetValue("EnableAutoOffsetStore"); public string GroupId => GetValueOrThrowInvalidConfigException("GroupId"); public string SaslUsername => GetValue("SaslUsername"); public string SaslPassword => GetValue("SaslPassword"); @@ -33,30 +32,15 @@ public ConfigurationService(IConfiguration configuration) public SaslMechanism? SaslMechanism => GetValue("SaslMechanism"); public string SslKeystorePassword => GetValue("SslKeystorePassword"); public SecurityProtocol? SecurityProtocol => GetValue("SecurityProtocol"); - - public bool EnableIdempotence => GetValue("ProducerEnableIdempotence") ?? - Constants.ProducerConfigDefaults.EnableIdempotence; - + public bool? EnableIdempotence => GetValue("ProducerEnableIdempotence"); public Acks? Acks => GetValue("ProducerAcks"); - - public int BatchSize => - GetValue("ProducerBatchSize") ?? Constants.ProducerConfigDefaults.BatchSize; - - public string ClientId => - GetValue("ProducerClientId") ?? Constants.ProducerConfigDefaults.ClientId; - - public double LingerMs => - GetValue("ProducerLingerMs") ?? Constants.ProducerConfigDefaults.LingerMs; - - public int MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs") ?? - Constants.ProducerConfigDefaults.MessageTimeoutMs; - - public int RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs") ?? - Constants.ProducerConfigDefaults.RequestTimeoutMs; - - public int MessageMaxBytes => GetValue("ProducerMessageMaxBytes") ?? - Constants.ProducerConfigDefaults.MessageMaxBytes; - + public int? BatchSize => GetValue("ProducerBatchSize"); + public string ClientId => GetValue("ProducerClientId"); + public double? LingerMs => GetValue("ProducerLingerMs"); + public int? MessageTimeoutMs => GetValue("ProducerMessageTimeoutMs"); + public int? RequestTimeoutMs => GetValue("ProducerRequestTimeoutMs"); + public int? MessageMaxBytes => GetValue("ProducerMessageMaxBytes"); + private string GetValueOrThrowInvalidConfigException(string configName) { var configValue = _configuration.GetValue(configName); diff --git a/src/Services/Implementations/KafkaService.cs b/src/Services/Implementations/KafkaService.cs index 67df77c..9560275 100644 --- a/src/Services/Implementations/KafkaService.cs +++ b/src/Services/Implementations/KafkaService.cs @@ -1,5 +1,6 @@ using System; using Confluent.Kafka; +using KafkaRetry.Job.Helpers.KafkaConfigs; using KafkaRetry.Job.Services.Interfaces; namespace KafkaRetry.Job.Services.Implementations; @@ -41,68 +42,61 @@ public IAdminClient BuildAdminClient() return adminClientBuilder.Build(); } + private ClientConfig CreateClientConfig(string bootstrapServers) + { + ClientConfig clientConfig = new ClientConfig() + .WithBootstrapServers(bootstrapServers) + .WithClientId(_configuration.ClientId) + .WithMessageMaxBytes(_configuration.MessageMaxBytes) + .WithSaslUsername(_configuration.SaslUsername) + .WithSaslPassword(_configuration.SaslPassword) + .WithSslCaLocation(_configuration.SslCaLocation) + .WithSaslMechanism(_configuration.SaslMechanism) + .WithSecurityProtocol(_configuration.SecurityProtocol) + .WithSslKeystorePassword(_configuration.SslKeystorePassword) + .WithAcks(_configuration.Acks); + + return clientConfig; + } + private AdminClientConfig CreateAdminClientConfig(string bootstrapServers) { - return new AdminClientConfig - { - BootstrapServers = bootstrapServers, - SaslUsername = _configuration.SaslUsername ?? string.Empty, - SaslPassword = _configuration.SaslPassword ?? string.Empty, - SslCaLocation = _configuration.SslCaLocation ?? string.Empty, - SaslMechanism = _configuration.SaslMechanism, - SecurityProtocol = _configuration.SecurityProtocol, - SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty - }; + ClientConfig clientConfig = CreateClientConfig(bootstrapServers); + return new AdminClientConfig(clientConfig); } private ProducerConfig CreateProducerConfig(string bootstrapServers) { - var producerConfig = new ProducerConfig - { - BootstrapServers = bootstrapServers, - SaslUsername = _configuration.SaslUsername ?? string.Empty, - SaslPassword = _configuration.SaslPassword ?? string.Empty, - SslCaLocation = _configuration.SslCaLocation ?? string.Empty, - SaslMechanism = _configuration.SaslMechanism, - SecurityProtocol = _configuration.SecurityProtocol, - SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty, - EnableIdempotence = _configuration.EnableIdempotence, - BatchSize = _configuration.BatchSize, - ClientId = _configuration.ClientId, - LingerMs = _configuration.LingerMs, - MessageTimeoutMs = _configuration.MessageTimeoutMs, - RequestTimeoutMs = _configuration.RequestTimeoutMs, - MessageMaxBytes = _configuration.MessageMaxBytes - }; + ClientConfig clientConfig = CreateClientConfig(bootstrapServers); + ProducerConfig producerConfig = new ProducerConfig(clientConfig); - if (_configuration.Acks is not null) - { - producerConfig.Acks = _configuration.Acks; - } + producerConfig = producerConfig + .WithEnableIdempotence(_configuration.EnableIdempotence) + .WithBatchSize(_configuration.BatchSize) + .WithLingerMs(_configuration.LingerMs) + .WithMessageTimeoutMs(_configuration.MessageTimeoutMs) + .WithRequestTimeoutMs(_configuration.RequestTimeoutMs); + return producerConfig; } private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string groupId) { - return new ConsumerConfig - { - BootstrapServers = bootstrapServers, - AutoOffsetReset = AutoOffsetReset.Earliest, - GroupId = groupId, - EnableAutoCommit = _configuration.EnableAutoCommit, - SaslUsername = _configuration.SaslUsername ?? string.Empty, - SaslPassword = _configuration.SaslPassword ?? string.Empty, - SslCaLocation = _configuration.SslCaLocation ?? string.Empty, - SaslMechanism = _configuration.SaslMechanism, - SecurityProtocol = _configuration.SecurityProtocol, - SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty, - EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore - }; + ClientConfig clientConfig = CreateClientConfig(bootstrapServers); + ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig); + + consumerConfig = consumerConfig + .WithGroupId(groupId) + .WithAutoOffsetReset(AutoOffsetReset.Earliest) + .WithEnableAutoCommit(_configuration.EnableAutoCommit) + .WithEnableAutoOffsetStore(_configuration.EnableAutoOffsetStore); + + return consumerConfig; } public Action, ConsumeResult> GetConsumerCommitStrategy() { - return _configuration.EnableAutoCommit ? + return _configuration.EnableAutoCommit is true ? (assignedConsumer, result) => { assignedConsumer.StoreOffset(result); diff --git a/tests/Containers/KafkaContainer.cs b/tests/Containers/KafkaContainer.cs index 0c430aa..261c84c 100644 --- a/tests/Containers/KafkaContainer.cs +++ b/tests/Containers/KafkaContainer.cs @@ -1,17 +1,15 @@ using System; using System.IO; using System.Threading.Tasks; -using DotNet.Testcontainers.Containers.Builders; -using DotNet.Testcontainers.Containers.Modules; -using DotNet.Testcontainers.Containers.OutputConsumers; -using DotNet.Testcontainers.Containers.WaitStrategies; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; namespace KafkaRetry.Job.Tests.Containers { public class KafkaContainer { private const int Port = 9092; - private readonly TestcontainersContainer _container; + private readonly IContainer _container; private readonly Stream _outStream = new MemoryStream(); private readonly Stream _errorStream = new MemoryStream(); @@ -24,7 +22,7 @@ public KafkaContainer(string zookeeperAddress) dockerHost = "unix:/var/run/docker.sock"; } - _container = new TestcontainersBuilder() + _container = new ContainerBuilder() .WithDockerEndpoint(dockerHost) .WithImage("confluentinc/cp-kafka:6.0.1") .WithExposedPort(Port) diff --git a/tests/Containers/ZookeeperContainer.cs b/tests/Containers/ZookeeperContainer.cs index 3768f39..ddae5df 100644 --- a/tests/Containers/ZookeeperContainer.cs +++ b/tests/Containers/ZookeeperContainer.cs @@ -1,8 +1,7 @@ using System; using System.Threading.Tasks; -using DotNet.Testcontainers.Containers.Builders; -using DotNet.Testcontainers.Containers.Modules; -using DotNet.Testcontainers.Containers.WaitStrategies; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Containers; using DotNet.Testcontainers.Images; namespace KafkaRetry.Job.Tests.Containers @@ -10,7 +9,7 @@ namespace KafkaRetry.Job.Tests.Containers public class ZookeeperContainer { private const int Port = 2181; - private readonly TestcontainersContainer _container; + private readonly IContainer _container; public string Address => $"{_container.IpAddress}:{Port}"; @@ -22,7 +21,7 @@ public ZookeeperContainer() dockerHost = "unix:/var/run/docker.sock"; } - _container = new TestcontainersBuilder() + _container = new ContainerBuilder() .WithDockerEndpoint(dockerHost) .WithImage(new DockerImage("zookeeper")) .WithExposedPort(Port) diff --git a/tests/kafka.retry.job.tests.csproj b/tests/kafka.retry.job.tests.csproj index 3e05128..450fd94 100644 --- a/tests/kafka.retry.job.tests.csproj +++ b/tests/kafka.retry.job.tests.csproj @@ -14,7 +14,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - +