diff --git a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala index 90b9f5f..9f7d02c 100644 --- a/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala +++ b/src/main/scala/uk/sky/fs2/kafka/topicloader/TopicLoader.scala @@ -72,6 +72,13 @@ trait TopicLoader { topics: NonEmptyList[String], consumerSettings: ConsumerSettings[F, K, V] )(onLoad: Resource.ExitCase => F[Unit]): Stream[F, ConsumerRecord[K, V]] = { + def preLoad( + logOffsets: NonEmptyMap[TopicPartition, LogOffsets] + )(using Logger[F]): Stream[F, ConsumerRecord[K, V]] = for { + preloadConsumer <- KafkaConsumer.stream(consumerSettings) + record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) + } yield record + def postLoad( logOffsets: NonEmptyMap[TopicPartition, LogOffsets] )(using Logger[F]): Stream[F, ConsumerRecord[K, V]] = @@ -84,10 +91,14 @@ trait TopicLoader { for { given Logger[F] <- Stream.eval(LoggerFactory[F].create) - preloadConsumer <- KafkaConsumer.stream(consumerSettings) - logOffsets <- Stream.eval(logOffsetsForTopics(topics, LoadAll, preloadConsumer)).flatMap(Stream.fromOption(_)) + maybeLogOffsets <- Stream + .eval( + // Destroy the consumer after calculating the offsets + KafkaConsumer.resource(consumerSettings).use(logOffsetsForTopics(topics, LoadAll, _)) + ) + logOffsets <- Stream.fromOption(maybeLogOffsets) _ <- Stream.eval(info"log offsets: ${logOffsets.show}") - record <- load(logOffsets, preloadConsumer).onFinalizeCase(onLoad) ++ postLoad(logOffsets) + record <- preLoad(logOffsets) ++ postLoad(logOffsets) } yield record } @@ -97,9 +108,10 @@ trait TopicLoader { consumer: KafkaConsumer[F, K, V] ): Stream[F, ConsumerRecord[K, V]] = for { - logOffsets <- Stream.eval(logOffsetsForTopics(topics, strategy, consumer)).flatMap(Stream.fromOption(_)) - _ <- Stream.eval(info"log offsets: ${logOffsets.show}") - record <- load(logOffsets, consumer) + maybeLogOffsets <- Stream.eval(logOffsetsForTopics(topics, strategy, consumer)) + logOffsets <- Stream.fromOption(maybeLogOffsets) + _ <- Stream.eval(info"log offsets: ${logOffsets.show}") + record <- load(logOffsets, consumer) } yield record private def load[F[_] : Async : Logger, K, V](