Skip to content

Commit

Permalink
fix(dc_topology_rf_change): wait for tablets balance after revert_to_…
Browse files Browse the repository at this point in the history
…original_keyspaces_rf

	When replication-factor is increased after a new node was added,
	wait for tablets migration to complete.
  • Loading branch information
yarongilor authored and fruch committed Sep 3, 2024
1 parent 7992178 commit 12fd46f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 46 deletions.
6 changes: 3 additions & 3 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
NodeNotReady,
SstablesNotFound,
)
from sdcm.utils.replication_strategy_utils import ReplicationStrategy, DataCenterTopologyRfChange
from sdcm.utils.replication_strategy_utils import ReplicationStrategy, DataCenterTopologyRfControl

# Test duration (min). Parameter used to keep instances produced by tests that
# are supposed to run longer than 24 hours from being killed
Expand Down Expand Up @@ -4991,10 +4991,10 @@ def get_node_ip_list(verification_node):
self.terminate_node(node) # pylint: disable=no-member
self.test_config.tester_obj().monitors.reconfigure_scylla_monitoring()

def decommission(self, node: BaseNode, timeout: int | float = None) -> DataCenterTopologyRfChange | None:
def decommission(self, node: BaseNode, timeout: int | float = None) -> DataCenterTopologyRfControl | None:
with node.parent_cluster.cql_connection_patient(node) as session:
if tablets_enabled := is_tablets_feature_enabled(session):
dc_topology_rf_change = DataCenterTopologyRfChange(target_node=node)
dc_topology_rf_change = DataCenterTopologyRfControl(target_node=node)
dc_topology_rf_change.decrease_keyspaces_rf()
with adaptive_timeout(operation=Operations.DECOMMISSION, node=node):
node.run_nodetool("decommission", timeout=timeout, long_running=True, retry=0)
Expand Down
32 changes: 4 additions & 28 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
from sdcm.provision.scylla_yaml import SeedProvider
from sdcm.provision.helpers.certificate import update_certificate, TLSAssets
from sdcm.remote.libssh2_client.exceptions import UnexpectedExit as Libssh2UnexpectedExit
from sdcm.rest.remote_curl_client import RemoteCurlClient
from sdcm.sct_events import Severity
from sdcm.sct_events.database import DatabaseLogEvent
from sdcm.sct_events.decorators import raise_event_on_failure
Expand Down Expand Up @@ -128,6 +127,7 @@
NetworkTopologyReplicationStrategy, ReplicationStrategy, SimpleReplicationStrategy
from sdcm.utils.sstable.load_utils import SstableLoadUtils
from sdcm.utils.sstable.sstable_utils import SstableUtils
from sdcm.utils.tablets.common import wait_for_tablets_balanced
from sdcm.utils.toppartition_util import NewApiTopPartitionCmd, OldApiTopPartitionCmd
from sdcm.utils.version_utils import MethodVersionNotFound, scylla_versions
from sdcm.utils.raft import Group0MembersNotConsistentWithTokenRingMembersException, TopologyOperations
Expand Down Expand Up @@ -1326,7 +1326,7 @@ def _nodetool_decommission(self, add_node=True):
new_node.set_seed_flag(target_is_seed)
self.cluster.update_seed_provider()
if dc_topology_rf_change:
dc_topology_rf_change.revert_to_original_keyspaces_rf()
dc_topology_rf_change.revert_to_original_keyspaces_rf(node_to_wait_for_balance=new_node)
try:
self.nodetool_cleanup_on_all_nodes_parallel()
finally:
Expand Down Expand Up @@ -1536,7 +1536,7 @@ def _disrupt_kubernetes_then_decommission_and_add_scylla_node(self, disruption_m

new_node = self.add_new_nodes(count=1, rack=node.rack)[0]
if dc_topology_rf_change:
dc_topology_rf_change.revert_to_original_keyspaces_rf()
dc_topology_rf_change.revert_to_original_keyspaces_rf(node_to_wait_for_balance=new_node)
self.unset_current_running_nemesis(new_node)

# NOTE: wait for all other neighbour pods become ready
Expand Down Expand Up @@ -4033,7 +4033,7 @@ def disrupt_corrupt_then_scrub(self):
@latency_calculator_decorator(legend="Adding new nodes")
def add_new_nodes(self, count, rack=None, instance_type: str = None) -> list[BaseNode]:
nodes = self._add_and_init_new_cluster_nodes(count, rack=rack, instance_type=instance_type)
self._wait_for_tablets_balanced(nodes[0])
wait_for_tablets_balanced(nodes[0])
return nodes

@latency_calculator_decorator(legend="Decommission nodes: remove nodes from cluster")
Expand Down Expand Up @@ -5108,30 +5108,6 @@ def disrupt_disable_binary_gossip_execute_major_compaction(self):
self.target_node.restart_scylla_server()
raise

def _wait_for_tablets_balanced(self, node):
"""
Waiting for tablets to be balanced using REST API.
doing it several times as there's a risk of:
"currently a small time window after adding nodes and before load balancing starts during which
topology may appear as quiesced because the state machine goes through an idle state before it enters load balancing state"
"""
if not node.raft.is_enabled:
self.log.info("Raft is disabled, skipping wait for balance")
return
with self.cluster.cql_connection_patient(node=node) as session:
if not is_tablets_feature_enabled(session):
self.log.info("Tablets are disabled, skipping wait for balance")
return
time.sleep(60) # one minute gap before checking, just to give some time to the state machine
client = RemoteCurlClient(host="127.0.0.1:10000", endpoint="", node=node)
self.log.info("Waiting for tablets to be balanced")
for _ in range(3):
client.run_remoter_curl(method="POST", path="storage_service/quiesce_topology",
params={}, timeout=3600, retry=3)
time.sleep(5)
self.log.info("Tablets are balanced")


def disrupt_method_wrapper(method, is_exclusive=False): # pylint: disable=too-many-statements # noqa: PLR0915
"""
Expand Down
7 changes: 4 additions & 3 deletions sdcm/rest/remote_curl_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
#
# Copyright (c) 2022 ScyllaDB

from typing import Literal
from typing import Literal, TYPE_CHECKING

from sdcm.cluster import BaseNode
if TYPE_CHECKING:
from sdcm.cluster import BaseNode
from sdcm.rest.rest_client import RestClient


Expand All @@ -22,7 +23,7 @@ class ScyllaApiException(Exception):


class RemoteCurlClient(RestClient):
def __init__(self, host: str, endpoint: str, node: BaseNode):
def __init__(self, host: str, endpoint: str, node: 'BaseNode'):
super().__init__(host=host, endpoint=endpoint)
self._node = node
self._remoter = self._node.remoter
Expand Down
46 changes: 34 additions & 12 deletions sdcm/utils/replication_strategy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from sdcm.utils.cql_utils import cql_quote_if_needed
from sdcm.utils.database_query_utils import is_system_keyspace, LOGGER
from sdcm.utils.tablets.common import wait_for_tablets_balanced

if TYPE_CHECKING:
from sdcm.cluster import BaseNode

Expand Down Expand Up @@ -123,13 +125,23 @@ def __call__(self, **keyspaces: ReplicationStrategy) -> None:
strategy.apply(self.node, keyspace)


class DataCenterTopologyRfChange:
class DataCenterTopologyRfControl:
"""
If any keyspace RF equals to number-of-cluster-nodes, where tablets are in use,
then a decommission is not supported.
In this case, the user has to decrease the replication-factor of any such keyspace first.
Later on, after adding a new node, such a keyspace can be reconfigured back to its original
replication-factor value.
This class manages and controls the replication factor (RF) of keyspaces in a ScyllaDB data center, when nodes are removed or re-added to the cluster.
**Purpose**:
- In scenarios where a keyspace has an RF equal to the total number of nodes in a data center, decommissioning a node is not supported where tablets are used.
- This class provides functionality to temporarily decrease the RF of such keyspaces before a node decommissioning operation and revert them back to their original RF after a new node is added.
**Usage**:
1. **`decrease_keyspaces_rf`**: Identifies keyspaces with RF equal to the total number of nodes in the data center and decreases their RF by 1. This is necessary so decommissioning a node is allowed (with tablets).
2. **`revert_to_original_keyspaces_rf`**: Reverts the RF of the keyspaces back to their original values after a new node is added to the data center.
Attributes:
- `target_node`: The node to decommission.
- `datacenter`: The data center to which the target node belongs.
- `decreased_rf_keyspaces`: A list of keyspaces whose RF has been decreased.
- `original_nodes_number`: The original number of nodes in the data center (before decommission).
"""

def __init__(self, target_node: 'BaseNode') -> None:
Expand Down Expand Up @@ -188,14 +200,24 @@ def _alter_keyspace_rf(self, keyspace: str, replication_factor: int, session):
LOGGER.error(f"{message} Failed with: {error}")
raise error

def revert_to_original_keyspaces_rf(self):
LOGGER.debug(f"Reverting keyspaces replication factor to original value of {self.datacenter}..")
with self.cluster.cql_connection_patient(self.cluster.nodes[0]) as session:
for keyspace in self.decreased_rf_keyspaces:
self._alter_keyspace_rf(keyspace=keyspace, replication_factor=self.original_nodes_number,
session=session)
def revert_to_original_keyspaces_rf(self, node_to_wait_for_balance: 'BaseNode' = None):
if self.decreased_rf_keyspaces:
LOGGER.debug(f"Reverting keyspaces replication factor to original value of {self.datacenter}..")
with self.cluster.cql_connection_patient(self.cluster.nodes[0]) as session:
for keyspace in self.decreased_rf_keyspaces:
self._alter_keyspace_rf(keyspace=keyspace, replication_factor=self.original_nodes_number,
session=session)
if node_to_wait_for_balance:
wait_for_tablets_balanced(node_to_wait_for_balance)

def decrease_keyspaces_rf(self):
"""
If any keyspace RF equals to number-of-cluster-nodes, where tablets are in use,
then a decommission is not supported.
In this case, the user has to decrease the replication-factor of any such keyspace first.
Later on, after adding a new node, such a keyspace can be reconfigured back to its original
replication-factor value.
"""
node = self.target_node
with self.cluster.cql_connection_patient(node) as session:
# Ensure that nodes_num is 2 or greater
Expand Down
32 changes: 32 additions & 0 deletions sdcm/utils/tablets/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import logging
import time
from dataclasses import dataclass
from typing import Optional

from sdcm.rest.remote_curl_client import RemoteCurlClient
from sdcm.utils.features import is_tablets_feature_enabled

LOGGER = logging.getLogger(__name__)


@dataclass
class TabletsConfiguration:
Expand All @@ -14,3 +21,28 @@ def __str__(self):
value = str(v).lower() if isinstance(v, bool) else v
items.append(f"'{k}': {value}")
return '{' + ', '.join(items) + '}'


def wait_for_tablets_balanced(node):
"""
Waiting for tablets to be balanced using REST API.
doing it several times as there's a risk of:
"currently a small time window after adding nodes and before load balancing starts during which
topology may appear as quiesced because the state machine goes through an idle state before it enters load balancing state"
"""
if not node.raft.is_enabled:
LOGGER.info("Raft is disabled, skipping wait for balance")
return
with node.parent_cluster.cql_connection_patient(node=node) as session:
if not is_tablets_feature_enabled(session):
LOGGER.info("Tablets are disabled, skipping wait for balance")
return
time.sleep(60) # one minute gap before checking, just to give some time to the state machine
client = RemoteCurlClient(host="127.0.0.1:10000", endpoint="", node=node)
LOGGER.info("Waiting for tablets to be balanced")
for _ in range(3):
client.run_remoter_curl(method="POST", path="storage_service/quiesce_topology",
params={}, timeout=3600, retry=3)
time.sleep(5)
LOGGER.info("Tablets are balanced")

0 comments on commit 12fd46f

Please sign in to comment.