Skip to content

Commit

Permalink
Add strategy that tracks unallocated shards (#199)
Browse files Browse the repository at this point in the history
* Add strategy that tracks unallocated shards
  • Loading branch information
BrainHorse committed Mar 19, 2024
1 parent 3a41fe5 commit 4c265b0
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 8 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
strategy:
matrix:
scala:
- 2.13.5
- 2.12.13
- 2.13.10
- 2.12.17

steps:
- uses: actions/checkout@v2
Expand Down
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,11 @@ project/plugins/project/
*.iml
*.ipr

# VSCode + Metals
.bloop
.metals
.vscode
metals.sbt

# Mac
.DS_Store
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ organizationHomepage := Some(url("http://evolution.com"))

scalaVersion := crossScalaVersions.value.head

crossScalaVersions := Seq("2.13.5", "2.12.13")
crossScalaVersions := Seq("2.13.10", "2.12.17")

scalacOptions -= "-Ywarn-unused:params"

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.2
sbt.version=1.9.9
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.7.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.11")

addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,46 @@ object ShardingStrategy {
}
}

/**
* Adds a track of unallocated shards.
* If we configured sharding in such a way that it automatically re-create shards and entities inside of it after a rebalance,
* we know that all shards returned from rebalance call would be allocated in the nearest future.
* Term unallocated shard in this context means such shard that was stopped on a previous node, but not yet allocated on a new one.
*/
object TrackUnallocated {

def of[F[_] : Sync](
strategy: ShardingStrategy[F],
): F[ShardingStrategy[F]] =
for {
unallocatedShards <- Ref[F].of(Set.empty[Shard])
} yield {
apply(unallocatedShards, strategy)
}

def apply[F[_] : FlatMap](
unallocatedShards: Ref[F, Set[Shard]],
strategy: ShardingStrategy[F],
): ShardingStrategy[F] = {

new ShardingStrategy[F] {

def allocate(requester: Region, shard: Shard, current: Allocation) =
for {
region <- strategy.allocate(requester, shard, current)
_ <- unallocatedShards.update(_.filterNot(_ == shard))
} yield region

def rebalance(current: Allocation, inProgress: Set[Shard]) =
for {
lastUnallocated <- unallocatedShards.get
shards <- strategy.rebalance(current, inProgress ++ lastUnallocated)
_ <- unallocatedShards.update(_ ++ shards)
} yield shards
}
}
}


implicit class ShardingStrategyOps[F[_]](val self: ShardingStrategy[F]) extends AnyVal {

Expand Down Expand Up @@ -343,6 +383,8 @@ object ShardingStrategy {
ShardRebalanceCooldown.of[F](cooldown, self)
}

def withTrackUnallocated(implicit F: Sync[F]): F[ShardingStrategy[F]] =
TrackUnallocated.of(self)

def toAllocationStrategy(
fallback: Allocate = Allocate.Default)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class AllocationStrategyProxySpec extends AsyncFunSuite with ActorSpec with Matc
private val region = RegionOf(actorSystem)
private val shard = "shard"
private val ignore = (msg: () => String) => {msg(); ()}
private implicit val addressOf = AddressOf(actorSystem)

private implicit val addressOf: AddressOf = AddressOf(actorSystem)

test("allocate") {
val allocation = Map((region, IndexedSeq(shard)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class LoggingStrategySpec extends AnyFunSuite with ActorSpec with Matchers {
private val shard = "shard"
private val address = Address("", "", "127.0.0.1", 2552)

private implicit val addressOf = AddressOf.const(address)
private implicit val addressOf: AddressOf = AddressOf.const(address)

private val log = (msg: () => String) => {
StateT { state =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolutiongaming.cluster.sharding

import cats.Id
import cats.effect.SyncIO

import scala.collection.immutable.IndexedSeq
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -46,6 +47,37 @@ class ShardingStrategySpec extends AnyWordSpec with ActorSpec with Matchers {
Set.empty) shouldEqual Nil
}

"track unallocated" in {
val maxSimulations = 2
val test = for {
strategy <- RebalanceAllStrategy[SyncIO]()
.takeShards(SyncIO.pure(maxSimulations))
.withTrackUnallocated
initAllocation = Map(
region1 -> IndexedSeq(shard1, shard2, shard3, shard4, shard5),
)
rebalanceInitial <- strategy.rebalance(initAllocation, Set.empty)
afterShutdownAllocation = Map(
region1 -> IndexedSeq(shard3, shard4, shard5),
)
rebalanceAfterShutdown <- strategy.rebalance(afterShutdownAllocation, Set.empty)
_ <- strategy.allocate(region2, shard1, afterShutdownAllocation)
afterAllocateAllocation = Map(
region1 -> IndexedSeq(shard3, shard4, shard5),
region2 -> IndexedSeq(shard1)
)
rebalanceAfterAllocation <- strategy.rebalance(
afterAllocateAllocation,
Set.empty
)
} yield {
rebalanceInitial should have size 2
rebalanceAfterShutdown should have size 0
rebalanceAfterAllocation should have size 1
}
test.unsafeRunSync()
}

"least shards" in {
val strategy = LeastShardsStrategy[Id]()
strategy.allocate(region1, shard2, Map(
Expand Down

0 comments on commit 4c265b0

Please sign in to comment.