Skip to content

Commit

Permalink
Merge pull request #38 from ahmetfurkankavraz/main
Browse files Browse the repository at this point in the history
Pull Request for the issues #15 #35
  • Loading branch information
MehmetFiratKomurcu authored Mar 26, 2024
2 parents 869e144 + b8d92aa commit a8f0743
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
name: Build & push Docker image
with:
image: kafka-retry-job
tags: 1.12.2, latest
tags: 1.12.3, latest
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

variables:
VERSION: "1.12.2"
VERSION: "1.12.3"
DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION
DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH

Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## [1.12.3](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03)

- [\#15](https://github.com/Trendyol/kafka-retry-job/issues/15) Fix integration tests
- [\#35](https://github.com/Trendyol/kafka-retry-job/issues/35) Use Builder pattern to read configurations

**Merged pull requests:**

- Pull Request for the issues #15 [\#35](https://github.com/Trendyol/kafka-retry-job/pull/38) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz))

## [1.12.2](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03)

**Closed issues:**
Expand Down
96 changes: 96 additions & 0 deletions src/Helpers/KafkaConfigs/ClientConfigBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using Confluent.Kafka;

namespace KafkaRetry.Job.Helpers.KafkaConfigs;

public static class ClientConfigBuilder
{
public static ClientConfig WithBootstrapServers(this ClientConfig config, string bootstrapServers)
{
if (!string.IsNullOrWhiteSpace(bootstrapServers))
{
config.BootstrapServers = bootstrapServers;
}
return config;
}

public static ClientConfig WithSaslUsername(this ClientConfig config, string username)
{
if (!string.IsNullOrWhiteSpace(username))
{
config.SaslUsername = username;
}
return config;
}

public static ClientConfig WithSaslPassword(this ClientConfig config, string password)
{
if (!string.IsNullOrWhiteSpace(password))
{
config.SaslPassword = password;
}
return config;
}

public static ClientConfig WithSslCaLocation(this ClientConfig config, string sslCaLocation)
{
if (!string.IsNullOrWhiteSpace(sslCaLocation))
{
config.SslCaLocation = sslCaLocation;
}
return config;
}

public static ClientConfig WithSaslMechanism(this ClientConfig config, SaslMechanism? saslMechanism)
{
if (saslMechanism is not null)
{
config.SaslMechanism = saslMechanism;
}
return config;
}

public static ClientConfig WithSecurityProtocol(this ClientConfig config, SecurityProtocol? securityProtocol)
{
if (securityProtocol is not null)
{
config.SecurityProtocol = securityProtocol;
}
return config;
}

public static ClientConfig WithSslKeystorePassword(this ClientConfig config, string sslKeystorePassword)
{
if (!string.IsNullOrWhiteSpace(sslKeystorePassword))
{
config.SslKeystorePassword = sslKeystorePassword;
}
return config;
}

public static ClientConfig WithClientId(this ClientConfig config, string clientId)
{
if (!string.IsNullOrWhiteSpace(clientId))
{
config.ClientId = clientId;
}
return config;
}

public static ClientConfig WithMessageMaxBytes(this ClientConfig config, int? messageMaxBytes)
{
if (messageMaxBytes is not null)
{
config.MessageMaxBytes = messageMaxBytes;
}
return config;
}

public static ClientConfig WithAcks(this ClientConfig config, Acks? acks)
{
if (acks is not null)
{
config.Acks = acks;
}
return config;
}
}
42 changes: 42 additions & 0 deletions src/Helpers/KafkaConfigs/ConsumerConfigBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Confluent.Kafka;

namespace KafkaRetry.Job.Helpers.KafkaConfigs;

public static class ConsumerConfigBuilder
{
public static ConsumerConfig WithAutoOffsetReset(this ConsumerConfig config, AutoOffsetReset? autoOffsetReset)
{
if (autoOffsetReset is not null)
{
config.AutoOffsetReset = autoOffsetReset;
}
return config;
}

public static ConsumerConfig WithGroupId(this ConsumerConfig config, string groupId)
{
if (!string.IsNullOrWhiteSpace(groupId))
{
config.GroupId = groupId;
}
return config;
}

public static ConsumerConfig WithEnableAutoCommit(this ConsumerConfig config, bool? autoCommit)
{
if (autoCommit is not null)
{
config.EnableAutoCommit = autoCommit;
}
return config;
}

public static ConsumerConfig WithEnableAutoOffsetStore(this ConsumerConfig config, bool? autoOffsetStore)
{
if (autoOffsetStore is not null)
{
config.EnableAutoOffsetStore = autoOffsetStore;
}
return config;
}
}
48 changes: 48 additions & 0 deletions src/Helpers/KafkaConfigs/ProducerConfigBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Confluent.Kafka;

namespace KafkaRetry.Job.Helpers.KafkaConfigs;

public static class ProducerConfigBuilder
{
public static ProducerConfig WithEnableIdempotence(this ProducerConfig config, bool? idempotence)
{
if (idempotence is not null)
{
config.EnableIdempotence = idempotence;
}
return config;
}

public static ProducerConfig WithBatchSize(this ProducerConfig config, int? batchSize)
{
if (batchSize is not null)
{
config.BatchSize = batchSize;
}
return config;
}

public static ProducerConfig WithLingerMs(this ProducerConfig config, double? lingerMs)
{
if (lingerMs is not null)
{
config.LingerMs = lingerMs;
}
return config;
}

public static ProducerConfig WithMessageTimeoutMs(this ProducerConfig config, int? messageTimeoutMs)
{
if (messageTimeoutMs is not null)
{
config.MessageTimeoutMs = messageTimeoutMs;
}
return config;
}

public static ProducerConfig WithRequestTimeoutMs(this ProducerConfig config, int? requestTimeoutMs)
{
config.RequestTimeoutMs = requestTimeoutMs;
return config;
}
}
38 changes: 11 additions & 27 deletions src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,26 @@ public ConfigurationService(IConfiguration configuration)
public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix");
public string RetryTopicNameInHeader => GetValue<string>("RetryTopicNameInHeader");

public long MessageConsumeLimitPerTopicPartition =>
GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;
public long MessageConsumeLimitPerTopicPartition => GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;

public bool EnableAutoCommit => GetValue<bool?>("EnableAutoCommit") ?? false;
public bool EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore") ?? false;
public bool? EnableAutoCommit => GetValue<bool?>("EnableAutoCommit");
public bool? EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore");
public string GroupId => GetValueOrThrowInvalidConfigException("GroupId");
public string SaslUsername => GetValue<string>("SaslUsername");
public string SaslPassword => GetValue<string>("SaslPassword");
public string SslCaLocation => GetValue<string>("SslCaLocation");
public SaslMechanism? SaslMechanism => GetValue<SaslMechanism?>("SaslMechanism");
public string SslKeystorePassword => GetValue<string>("SslKeystorePassword");
public SecurityProtocol? SecurityProtocol => GetValue<SecurityProtocol?>("SecurityProtocol");

public bool EnableIdempotence => GetValue<bool?>("ProducerEnableIdempotence") ??
Constants.ProducerConfigDefaults.EnableIdempotence;

public bool? EnableIdempotence => GetValue<bool?>("ProducerEnableIdempotence");
public Acks? Acks => GetValue<Acks?>("ProducerAcks");

public int BatchSize =>
GetValue<int?>("ProducerBatchSize") ?? Constants.ProducerConfigDefaults.BatchSize;

public string ClientId =>
GetValue<string>("ProducerClientId") ?? Constants.ProducerConfigDefaults.ClientId;

public double LingerMs =>
GetValue<double?>("ProducerLingerMs") ?? Constants.ProducerConfigDefaults.LingerMs;

public int MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs") ??
Constants.ProducerConfigDefaults.MessageTimeoutMs;

public int RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs") ??
Constants.ProducerConfigDefaults.RequestTimeoutMs;

public int MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes") ??
Constants.ProducerConfigDefaults.MessageMaxBytes;

public int? BatchSize => GetValue<int?>("ProducerBatchSize");
public string ClientId => GetValue<string>("ProducerClientId");
public double? LingerMs => GetValue<double?>("ProducerLingerMs");
public int? MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs");
public int? RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs");
public int? MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes");

private string GetValueOrThrowInvalidConfigException(string configName)
{
var configValue = _configuration.GetValue<string>(configName);
Expand Down
86 changes: 40 additions & 46 deletions src/Services/Implementations/KafkaService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using Confluent.Kafka;
using KafkaRetry.Job.Helpers.KafkaConfigs;
using KafkaRetry.Job.Services.Interfaces;

namespace KafkaRetry.Job.Services.Implementations;
Expand Down Expand Up @@ -41,68 +42,61 @@ public IAdminClient BuildAdminClient()
return adminClientBuilder.Build();
}

private ClientConfig CreateClientConfig(string bootstrapServers)
{
ClientConfig clientConfig = new ClientConfig()
.WithBootstrapServers(bootstrapServers)
.WithClientId(_configuration.ClientId)
.WithMessageMaxBytes(_configuration.MessageMaxBytes)
.WithSaslUsername(_configuration.SaslUsername)
.WithSaslPassword(_configuration.SaslPassword)
.WithSslCaLocation(_configuration.SslCaLocation)
.WithSaslMechanism(_configuration.SaslMechanism)
.WithSecurityProtocol(_configuration.SecurityProtocol)
.WithSslKeystorePassword(_configuration.SslKeystorePassword)
.WithAcks(_configuration.Acks);

return clientConfig;
}

private AdminClientConfig CreateAdminClientConfig(string bootstrapServers)
{
return new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty
};
ClientConfig clientConfig = CreateClientConfig(bootstrapServers);
return new AdminClientConfig(clientConfig);
}

private ProducerConfig CreateProducerConfig(string bootstrapServers)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableIdempotence = _configuration.EnableIdempotence,
BatchSize = _configuration.BatchSize,
ClientId = _configuration.ClientId,
LingerMs = _configuration.LingerMs,
MessageTimeoutMs = _configuration.MessageTimeoutMs,
RequestTimeoutMs = _configuration.RequestTimeoutMs,
MessageMaxBytes = _configuration.MessageMaxBytes
};
ClientConfig clientConfig = CreateClientConfig(bootstrapServers);
ProducerConfig producerConfig = new ProducerConfig(clientConfig);

if (_configuration.Acks is not null)
{
producerConfig.Acks = _configuration.Acks;
}
producerConfig = producerConfig
.WithEnableIdempotence(_configuration.EnableIdempotence)
.WithBatchSize(_configuration.BatchSize)
.WithLingerMs(_configuration.LingerMs)
.WithMessageTimeoutMs(_configuration.MessageTimeoutMs)
.WithRequestTimeoutMs(_configuration.RequestTimeoutMs);

return producerConfig;
}

private ConsumerConfig CreateConsumerConfig(string bootstrapServers, string groupId)
{
return new ConsumerConfig
{
BootstrapServers = bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = groupId,
EnableAutoCommit = _configuration.EnableAutoCommit,
SaslUsername = _configuration.SaslUsername ?? string.Empty,
SaslPassword = _configuration.SaslPassword ?? string.Empty,
SslCaLocation = _configuration.SslCaLocation ?? string.Empty,
SaslMechanism = _configuration.SaslMechanism,
SecurityProtocol = _configuration.SecurityProtocol,
SslKeystorePassword = _configuration.SslKeystorePassword ?? string.Empty,
EnableAutoOffsetStore = _configuration.EnableAutoOffsetStore
};
ClientConfig clientConfig = CreateClientConfig(bootstrapServers);
ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);

consumerConfig = consumerConfig
.WithGroupId(groupId)
.WithAutoOffsetReset(AutoOffsetReset.Earliest)
.WithEnableAutoCommit(_configuration.EnableAutoCommit)
.WithEnableAutoOffsetStore(_configuration.EnableAutoOffsetStore);

return consumerConfig;
}

public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy()
{
return _configuration.EnableAutoCommit ?
return _configuration.EnableAutoCommit is true ?
(assignedConsumer, result) =>
{
assignedConsumer.StoreOffset(result);
Expand Down
Loading

0 comments on commit a8f0743

Please sign in to comment.