Skip to content

Commit

Permalink
ORIG-816: add strategy that tracks unallocated shards
Browse files Browse the repository at this point in the history
  • Loading branch information
BrainHorse committed Mar 19, 2024
1 parent b3e4a08 commit 47d1fe2
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,11 @@ project/plugins/project/
*.iml
*.ipr

# VSCode + Metals
.bloop
.metals
.vscode
metals.sbt

# Mac
.DS_Store
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.6.2
sbt.version=1.9.9
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.7.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.11")

addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,46 @@ object ShardingStrategy {
}
}

/**
* Adds a track of unallocated shards.
* If we configured sharding in such a way that it automatically re-create shards and entities inside of it after a rebalance,
* we know that all shards returned from rebalance call would be allocated in the nearest future.
* Term unallocated shard in this context means such shard that was stopped on a previous node, but not yet allocated on a new one.
*/
object TrackUnallocated {

def of[F[_] : Sync](
strategy: ShardingStrategy[F],
): F[ShardingStrategy[F]] =
for {
unallocatedShards <- Ref[F].of(Set.empty[Shard])
} yield {
apply(unallocatedShards, strategy)
}

def apply[F[_] : FlatMap](
unallocatedShards: Ref[F, Set[Shard]],
strategy: ShardingStrategy[F],
): ShardingStrategy[F] = {

new ShardingStrategy[F] {

def allocate(requester: Region, shard: Shard, current: Allocation) =
for {
region <- strategy.allocate(requester, shard, current)
_ <- unallocatedShards.update(_.filterNot(_ == shard))
} yield region

def rebalance(current: Allocation, inProgress: Set[Shard]) =
for {
lastUnallocated <- unallocatedShards.get
shards <- strategy.rebalance(current, inProgress ++ lastUnallocated)
_ <- unallocatedShards.update(_ ++ shards)
} yield shards
}
}
}


implicit class ShardingStrategyOps[F[_]](val self: ShardingStrategy[F]) extends AnyVal {

Expand Down Expand Up @@ -337,6 +377,8 @@ object ShardingStrategy {
ShardRebalanceCooldown.of[F](cooldown, self)
}

def withTrackUnallocated(implicit F: Sync[F]): F[ShardingStrategy[F]] =
TrackUnallocated.of(self)

def toAllocationStrategy(
fallback: Allocate = Allocate.Default)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class AllocationStrategyProxySpec extends AsyncFunSuite with ActorSpec with Matc
private val shard = "shard"
private val ignore = (msg: () => String) => {msg(); ()}

private implicit val addressOf = AddressOf(actorSystem)
private implicit val addressOf: AddressOf = AddressOf(actorSystem)

test("allocate") {
val allocation = Map((region, IndexedSeq(shard)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class LoggingStrategySpec extends AnyFunSuite with ActorSpec with Matchers {
private val shard = "shard"
private val address = Address("", "", "127.0.0.1", 2552)

private implicit val addressOf = AddressOf.const(address)
private implicit val addressOf: AddressOf = AddressOf.const(address)

private val log = (msg: () => String) => {
StateT { state =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolutiongaming.cluster.sharding

import cats.Id
import cats.effect.SyncIO

import scala.collection.immutable.IndexedSeq
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -46,6 +47,40 @@ class ShardingStrategySpec extends AnyWordSpec with ActorSpec with Matchers {
Set.empty) shouldEqual Nil
}

"track unallocated" in {
val maxSimulations = 2
val test = for {
strategy <- RebalanceAllStrategy[SyncIO]()
.takeShards(SyncIO.pure(maxSimulations))
.withTrackUnallocated
initAllocation = Map(
region1 -> IndexedSeq(shard1, shard2, shard3, shard4, shard5),
)
rebalanceInitial <- strategy.rebalance(initAllocation, Set.empty)
afterShutdownAllocation = Map(
region1 -> IndexedSeq(shard3, shard4, shard5),
)
rebalanceAfterShutdown <- strategy.rebalance(afterShutdownAllocation, Set.empty)
_ <- strategy.allocate(region2, shard1, afterShutdownAllocation)
afterAllocateAllocation = Map(
region1 -> IndexedSeq(shard3, shard4, shard5),
region2 -> IndexedSeq(shard1)
)
rebalanceAfterAllocation <- strategy.rebalance(
afterAllocateAllocation,
Set.empty
)
_ = println(rebalanceInitial)
_ = println(rebalanceAfterShutdown)
_ = println(rebalanceAfterAllocation)
} yield {
rebalanceInitial should have size 2
rebalanceAfterShutdown should have size 0
rebalanceAfterAllocation should have size 1
}
test.unsafeRunSync()
}

"least shards" in {
val strategy = LeastShardsStrategy[Id]()
strategy.allocate(region1, shard2, Map(
Expand Down

0 comments on commit 47d1fe2

Please sign in to comment.