diff --git a/.gitignore b/.gitignore index 116e2ff..3345226 100644 --- a/.gitignore +++ b/.gitignore @@ -22,5 +22,11 @@ project/plugins/project/ *.iml *.ipr +# VSCode + Metals +.bloop +.metals +.vscode +metals.sbt + # Mac .DS_Store \ No newline at end of file diff --git a/project/build.properties b/project/build.properties index c8fcab5..04267b1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.6.2 +sbt.version=1.9.9 diff --git a/project/plugins.sbt b/project/plugins.sbt index e9a2e88..6341eb5 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.7.2") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.11") addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7") diff --git a/src/main/scala/com/evolutiongaming/cluster/sharding/ShardingStrategy.scala b/src/main/scala/com/evolutiongaming/cluster/sharding/ShardingStrategy.scala index 59098cd..4ccb746 100644 --- a/src/main/scala/com/evolutiongaming/cluster/sharding/ShardingStrategy.scala +++ b/src/main/scala/com/evolutiongaming/cluster/sharding/ShardingStrategy.scala @@ -301,6 +301,46 @@ object ShardingStrategy { } } + /** + * Adds a track of unallocated shards. + * If we configured sharding in such a way that it automatically re-create shards and entities inside of it after a rebalance, + * we know that all shards returned from rebalance call would be allocated in the nearest future. + * Term unallocated shard in this context means such shard that was stopped on a previous node, but not yet allocated on a new one. + */ + object TrackUnallocated { + + def of[F[_] : Sync]( + strategy: ShardingStrategy[F], + ): F[ShardingStrategy[F]] = + for { + unallocatedShards <- Ref[F].of(Set.empty[Shard]) + } yield { + apply(unallocatedShards, strategy) + } + + def apply[F[_] : FlatMap]( + unallocatedShards: Ref[F, Set[Shard]], + strategy: ShardingStrategy[F], + ): ShardingStrategy[F] = { + + new ShardingStrategy[F] { + + def allocate(requester: Region, shard: Shard, current: Allocation) = + for { + region <- strategy.allocate(requester, shard, current) + _ <- unallocatedShards.update(_.filterNot(_ == shard)) + } yield region + + def rebalance(current: Allocation, inProgress: Set[Shard]) = + for { + lastUnallocated <- unallocatedShards.get + shards <- strategy.rebalance(current, inProgress ++ lastUnallocated) + _ <- unallocatedShards.update(_ ++ shards) + } yield shards + } + } + } + implicit class ShardingStrategyOps[F[_]](val self: ShardingStrategy[F]) extends AnyVal { @@ -337,6 +377,8 @@ object ShardingStrategy { ShardRebalanceCooldown.of[F](cooldown, self) } + def withTrackUnallocated(implicit F: Sync[F]): F[ShardingStrategy[F]] = + TrackUnallocated.of(self) def toAllocationStrategy( fallback: Allocate = Allocate.Default)(implicit diff --git a/src/test/scala/com/evolutiongaming/cluster/sharding/AllocationStrategyProxySpec.scala b/src/test/scala/com/evolutiongaming/cluster/sharding/AllocationStrategyProxySpec.scala index 96b7662..2b3e05c 100644 --- a/src/test/scala/com/evolutiongaming/cluster/sharding/AllocationStrategyProxySpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/sharding/AllocationStrategyProxySpec.scala @@ -17,7 +17,7 @@ class AllocationStrategyProxySpec extends AsyncFunSuite with ActorSpec with Matc private val shard = "shard" private val ignore = (msg: () => String) => {msg(); ()} - private implicit val addressOf = AddressOf(actorSystem) + private implicit val addressOf: AddressOf = AddressOf(actorSystem) test("allocate") { val allocation = Map((region, IndexedSeq(shard))) diff --git a/src/test/scala/com/evolutiongaming/cluster/sharding/LoggingStrategySpec.scala b/src/test/scala/com/evolutiongaming/cluster/sharding/LoggingStrategySpec.scala index 86d105a..9f68d2b 100644 --- a/src/test/scala/com/evolutiongaming/cluster/sharding/LoggingStrategySpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/sharding/LoggingStrategySpec.scala @@ -13,7 +13,7 @@ class LoggingStrategySpec extends AnyFunSuite with ActorSpec with Matchers { private val shard = "shard" private val address = Address("", "", "127.0.0.1", 2552) - private implicit val addressOf = AddressOf.const(address) + private implicit val addressOf: AddressOf = AddressOf.const(address) private val log = (msg: () => String) => { StateT { state => diff --git a/src/test/scala/com/evolutiongaming/cluster/sharding/ShardingStrategySpec.scala b/src/test/scala/com/evolutiongaming/cluster/sharding/ShardingStrategySpec.scala index d730668..3b9bb0d 100644 --- a/src/test/scala/com/evolutiongaming/cluster/sharding/ShardingStrategySpec.scala +++ b/src/test/scala/com/evolutiongaming/cluster/sharding/ShardingStrategySpec.scala @@ -1,6 +1,7 @@ package com.evolutiongaming.cluster.sharding import cats.Id +import cats.effect.SyncIO import scala.collection.immutable.IndexedSeq import org.scalatest.matchers.should.Matchers @@ -46,6 +47,40 @@ class ShardingStrategySpec extends AnyWordSpec with ActorSpec with Matchers { Set.empty) shouldEqual Nil } + "track unallocated" in { + val maxSimulations = 2 + val test = for { + strategy <- RebalanceAllStrategy[SyncIO]() + .takeShards(SyncIO.pure(maxSimulations)) + .withTrackUnallocated + initAllocation = Map( + region1 -> IndexedSeq(shard1, shard2, shard3, shard4, shard5), + ) + rebalanceInitial <- strategy.rebalance(initAllocation, Set.empty) + afterShutdownAllocation = Map( + region1 -> IndexedSeq(shard3, shard4, shard5), + ) + rebalanceAfterShutdown <- strategy.rebalance(afterShutdownAllocation, Set.empty) + _ <- strategy.allocate(region2, shard1, afterShutdownAllocation) + afterAllocateAllocation = Map( + region1 -> IndexedSeq(shard3, shard4, shard5), + region2 -> IndexedSeq(shard1) + ) + rebalanceAfterAllocation <- strategy.rebalance( + afterAllocateAllocation, + Set.empty + ) + _ = println(rebalanceInitial) + _ = println(rebalanceAfterShutdown) + _ = println(rebalanceAfterAllocation) + } yield { + rebalanceInitial should have size 2 + rebalanceAfterShutdown should have size 0 + rebalanceAfterAllocation should have size 1 + } + test.unsafeRunSync() + } + "least shards" in { val strategy = LeastShardsStrategy[Id]() strategy.allocate(region1, shard2, Map(