Skip to content

Commit

Permalink
Merge pull request #43 from ahmetfurkankavraz/main
Browse files Browse the repository at this point in the history
#29, #30 and #44 change the error catching and message consumption limit mechanism to topic-partition-based and apply topic-based parallel programming to optimize program running time
  • Loading branch information
MehmetFiratKomurcu authored Aug 29, 2024
2 parents f28b0da + a3412ff commit 7084cf5
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 67 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
name: Build & push Docker image
with:
image: kafka-retry-job
tags: 1.12.3, latest
tags: 1.12.4, latest
registry: ghcr.io
username: ${{ secrets.GHCR_USERNAME }}
password: ${{ secrets.GHCR_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

variables:
VERSION: "1.12.3"
VERSION: "1.12.4"
DOCKER_IMAGE_VERSION: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH:$VERSION
DOCKER_IMAGE_LATEST: $GITLAB_REGISTRY_HOST/$CI_PROJECT_PATH

Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## [1.12.4](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03)

- [\#29](https://github.com/Trendyol/kafka-retry-job/issues/29) Add async programming to consume message in parallel
- [\#30](https://github.com/Trendyol/kafka-retry-job/issues/30) Catch exception based on Partitions
- [\#44](https://github.com/Trendyol/kafka-retry-job/issues/44) Limit message consumption based on Topic instead of Topic Partition

**Merged pull requests:**

- Pull Request for the issues #29, #30, #44 [\#43](https://github.com/Trendyol/kafka-retry-job/pull/43) ([ahmetfurkankavraz](https://github.com/ahmetfurkankavraz))

## [1.12.3](https://github.com/github-changelog-generator/github-changelog-generator/tree/1.16.4) (2024-03)

- [\#15](https://github.com/Trendyol/kafka-retry-job/issues/15) Fix integration tests
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Implementations/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public ConfigurationService(IConfiguration configuration)
public string RetrySuffix => GetValueOrThrowInvalidConfigException("RetrySuffix");
public string RetryTopicNameInHeader => GetValue<string>("RetryTopicNameInHeader");

public long MessageConsumeLimitPerTopicPartition => GetValue<long?>("MessageConsumeLimitPerTopicPartition") ?? Int64.MaxValue;
public long MessageConsumeLimitPerTopic => GetValue<long?>("MessageConsumeLimitPerTopic") ?? Int64.MaxValue;

public bool? EnableAutoCommit => GetValue<bool?>("EnableAutoCommit") ?? false;
public bool? EnableAutoOffsetStore => GetValue<bool?>("EnableAutoOffsetStore") ?? false;
Expand All @@ -40,6 +40,7 @@ public ConfigurationService(IConfiguration configuration)
public int? MessageTimeoutMs => GetValue<int?>("ProducerMessageTimeoutMs");
public int? RequestTimeoutMs => GetValue<int?>("ProducerRequestTimeoutMs");
public int? MessageMaxBytes => GetValue<int?>("ProducerMessageMaxBytes");
public int MaxLevelParallelism => GetValue<int?>("MaxLevelParallelism") ?? 1;

private string GetValueOrThrowInvalidConfigException(string configName)
{
Expand Down
141 changes: 84 additions & 57 deletions src/Services/Implementations/KafkaRetryJobService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaRetry.Job.Services.Interfaces;
Expand All @@ -26,92 +28,112 @@ public KafkaRetryJobService(IKafkaService kafkaService,
public async Task MoveMessages()
{
_logService.LogApplicationStarted();

using var assignedConsumer = _kafkaService.BuildKafkaConsumer();
var adminClient = _kafkaService.BuildAdminClient();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120));
adminClient.Dispose();

var errorTopicPartitionsWithLag = GetErrorTopicInfosFromCluster(assignedConsumer, metadata);
var errorTopics = errorTopicPartitionsWithLag.Select(p => p.Item1.Topic).Distinct().ToList();
var adminClient = _kafkaService.GetKafkaAdminClient();
var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(120));
_kafkaService.ReleaseKafkaAdminClient(ref adminClient);

var consumer = _kafkaService.GetKafkaConsumer();
var errorTopicsWithLag = GetErrorTopicInfosFromCluster(consumer, metadata);
var errorTopics = errorTopicsWithLag.Keys.ToList();

_logService.LogMatchingErrorTopics(errorTopics);

using var producer = _kafkaService.BuildKafkaProducer();

var utcNow = DateTime.UtcNow;


var consumerCommitStrategy= _kafkaService.GetConsumerCommitStrategy();

try
var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopic;
if (messageConsumeLimit <= 0)
{
_logService.LogMessageConsumeLimitIsZero();
return;
}

var maxDegreeOfParallelism = _configuration.MaxLevelParallelism;
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
var tasks = new List<Task>();

foreach (var (_, topicPartitionsWithLag) in errorTopicsWithLag)
{
var messageConsumeLimit = _configuration.MessageConsumeLimitPerTopicPartition;
if (messageConsumeLimit <= 0)
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await MoveMessagesForTopic(topicPartitionsWithLag, consumerCommitStrategy);
}
finally
{
semaphore.Release();
}
}));
}

await Task.WhenAll(tasks.ToArray());

_logService.LogApplicationIsClosing();
}

private async Task MoveMessagesForTopic(
List<(TopicPartition, long)> topicPartitionsWithLag,
Action<IConsumer<string,string>, ConsumeResult<string,string>> consumerCommitStrategy
)
{
var messageConsumeLimitPerTopic = _configuration.MessageConsumeLimitPerTopic;

foreach (var (topicPartition, lag) in topicPartitionsWithLag)
{
if (lag <= 0)
{
_logService.LogMessageConsumeLimitIsZero();
return;
continue;
}

foreach (var (topicPartition, lag) in errorTopicPartitionsWithLag)
var consumer = _kafkaService.GetKafkaConsumer();
var producer = _kafkaService.GetKafkaProducer();
var errorTopic = topicPartition.Topic;

try
{
if (lag <= 0)
{
continue;
}

var messageConsumeLimitForTopicPartition = messageConsumeLimit;
_logService.LogStartOfSubscribingTopicPartition(topicPartition);

var errorTopic = topicPartition.Topic;

var currentLag = lag;

assignedConsumer.Assign(topicPartition);

while (currentLag > 0 && messageConsumeLimitForTopicPartition > 0)
consumer.Assign(topicPartition);

while (currentLag > 0 && messageConsumeLimitPerTopic > 0)
{
var result = assignedConsumer.Consume(TimeSpan.FromSeconds(3));
var result = consumer.Consume(TimeSpan.FromSeconds(3));

if (result is null)
{
break;
continue;
}

currentLag -= 1;
messageConsumeLimitForTopicPartition -= 1;

var resultDate = result.Message.Timestamp.UtcDateTime;

if (utcNow < resultDate)
{
_logService.LogNewMessageArrived(utcNow);
break;
}
messageConsumeLimitPerTopic -= 1;

result.Message.Timestamp = new Timestamp(DateTime.UtcNow);

var retryTopic = GetRetryTopicName(result, errorTopic);

_logService.LogProducingMessage(result, errorTopic, retryTopic);

await producer.ProduceAsync(retryTopic, result.Message);

consumerCommitStrategy.Invoke(assignedConsumer, result);
consumerCommitStrategy.Invoke(consumer, result);
}

_logService.LogEndOfSubscribingTopicPartition(topicPartition);
}

assignedConsumer.Unassign();

}
catch (Exception e)
{
_logService.LogError(e);
assignedConsumer.Unassign();
throw;
catch (Exception e)
{
_logService.LogError(e);
}
finally
{
consumer.Unassign();
_kafkaService.ReleaseKafkaConsumer(ref consumer);
_kafkaService.ReleaseKafkaProducer(ref producer);
}
}

_logService.LogApplicationIsClosing();
}

private string GetRetryTopicName(ConsumeResult<string,string> result , string errorTopic )
Expand All @@ -122,7 +144,7 @@ private string GetRetryTopicName(ConsumeResult<string,string> result , string er
errorTopic.ReplaceAtEnd(_configuration.ErrorSuffix, _configuration.RetrySuffix);
}

private List<(TopicPartition, long)> GetErrorTopicInfosFromCluster(IConsumer<string, string> assignedConsumer, Metadata metadata)
private IDictionary<string, List<(TopicPartition, long)>> GetErrorTopicInfosFromCluster(IConsumer<string, string> assignedConsumer, Metadata metadata)
{
_logService.LogFetchingErrorTopicInfoStarted();

Expand All @@ -142,7 +164,12 @@ private string GetRetryTopicName(ConsumeResult<string,string> result , string er
var watermark = assignedConsumer.QueryWatermarkOffsets(tpo.TopicPartition, TimeSpan.FromSeconds(5));
var lag = tpo.Offset >= 0 ? watermark.High - tpo.Offset : watermark.High - watermark.Low;
return (tpo.TopicPartition, lag);
}).ToList();
})
.GroupBy(t => t.Item1.Topic)
.ToImmutableDictionary(
t => t.Key,
t => t.ToList()
);

_logService.LogFetchingErrorTopicInfoFinished();

Expand Down
63 changes: 59 additions & 4 deletions src/Services/Implementations/KafkaService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using Confluent.Kafka;
using KafkaRetry.Job.Helpers.KafkaConfigs;
using KafkaRetry.Job.Services.Interfaces;
Expand All @@ -8,13 +9,34 @@ namespace KafkaRetry.Job.Services.Implementations;
public class KafkaService : IKafkaService
{
private readonly ConfigurationService _configuration;
private readonly ConcurrentBag<IConsumer<string, string>> _consumers = new();
private readonly ConcurrentBag<IProducer<string, string>> _producers = new();
private readonly ConcurrentBag<IAdminClient> _adminClients = new();

public KafkaService(ConfigurationService configuration)
{
_configuration = configuration;
}

public IConsumer<string, string> BuildKafkaConsumer()
~KafkaService()
{
while (_consumers.TryTake(out var consumer))
{
consumer.Dispose();
}

while (_producers.TryTake(out var producer))
{
producer.Dispose();
}

while (_adminClients.TryTake(out var adminClient))
{
adminClient.Dispose();
}
}

private IConsumer<string, string> BuildKafkaConsumer()
{
var bootstrapServers = _configuration.BootstrapServers;
var groupId = _configuration.GroupId;
Expand All @@ -24,23 +46,56 @@ public IConsumer<string, string> BuildKafkaConsumer()
return consumerBuilder.Build();
}

public IProducer<string, string> BuildKafkaProducer()
private IProducer<string, string> BuildKafkaProducer()
{
var bootstrapServers = _configuration.BootstrapServers;
var producerConfig = CreateProducerConfig(bootstrapServers);
var producerBuilder = new ProducerBuilder<string, string>(producerConfig);

return producerBuilder.Build();
}

public IAdminClient BuildAdminClient()
private IAdminClient BuildAdminClient()
{
var bootstrapServers = _configuration.BootstrapServers;
var adminClientConfig = CreateAdminClientConfig(bootstrapServers);
var adminClientBuilder = new AdminClientBuilder(adminClientConfig);

return adminClientBuilder.Build();
}

public IConsumer<string, string> GetKafkaConsumer()
{
return _consumers.TryTake(out var consumer) ? consumer : BuildKafkaConsumer();
}

public IProducer<string, string> GetKafkaProducer()
{
return _producers.TryTake(out var producer) ? producer : BuildKafkaProducer();
}

public IAdminClient GetKafkaAdminClient()
{
return _adminClients.TryTake(out var adminClient) ? adminClient : BuildAdminClient();
}

public void ReleaseKafkaConsumer(ref IConsumer<string, string> consumer)
{
_consumers.Add(consumer);
consumer = null;
}

public void ReleaseKafkaProducer(ref IProducer<string, string> producer)
{
_producers.Add(producer);
producer = null;
}

public void ReleaseKafkaAdminClient(ref IAdminClient adminClient)
{
_adminClients.Add(adminClient);
adminClient = null;
}

private ClientConfig CreateClientConfig(string bootstrapServers)
{
Expand Down
9 changes: 6 additions & 3 deletions src/Services/Interfaces/IKafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ namespace KafkaRetry.Job.Services.Interfaces
{
public interface IKafkaService
{
IConsumer<string, string> BuildKafkaConsumer();
IProducer<string, string> BuildKafkaProducer();
IAdminClient BuildAdminClient();
public IConsumer<string, string> GetKafkaConsumer();
public IProducer<string, string> GetKafkaProducer();
public IAdminClient GetKafkaAdminClient();
public void ReleaseKafkaConsumer(ref IConsumer<string, string> consumer);
public void ReleaseKafkaProducer(ref IProducer<string, string> producer);
public void ReleaseKafkaAdminClient(ref IAdminClient adminClient);
public Action<IConsumer<string, string>, ConsumeResult<string, string>> GetConsumerCommitStrategy();
}
}

0 comments on commit 7084cf5

Please sign in to comment.