Skip to content

Commit

Permalink
feature(nemesis): introduce lock for target selection
Browse files Browse the repository at this point in the history
since we run into multiple cases on parallel nemesis
that there were multiple nemesis using the same node

we are introducing a lock over the selection of target nodes
so we won't be able to pick it multiple times

Fixes: #6553
  • Loading branch information
fruch committed Dec 27, 2023
1 parent 751e689 commit 9e72716
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 67 deletions.
33 changes: 6 additions & 27 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import itertools
import json
import ipaddress

from importlib import import_module
from typing import List, Optional, Dict, Union, Set, Iterable, ContextManager, Any, IO, AnyStr
from datetime import datetime
from textwrap import dedent
Expand All @@ -51,8 +51,8 @@
from cassandra.policies import RetryPolicy
from cassandra.policies import WhiteListRoundRobinPolicy, HostFilterPolicy, RoundRobinPolicy
from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module

from argus.backend.util.enums import ResourceState

from sdcm.node_exporter_setup import NodeExporterSetup
from sdcm.db_log_reader import DbLogReader
from sdcm.mgmt import AnyManagerCluster, ScyllaManagerError
Expand Down Expand Up @@ -147,7 +147,6 @@
NodeNotReady,
SstablesNotFound,
)
from sdcm.utils.context_managers import run_nemesis

# Test duration (min). Parameter used to keep instances produced by tests that
# are supposed to run longer than 24 hours from being killed
Expand Down Expand Up @@ -284,7 +283,7 @@ def __init__(self, name, parent_cluster, ssh_login_info=None, base_logdir=None,
self.stop_wait_db_up_event = threading.Event()
self.lock = threading.Lock()

self._running_nemesis = None
self.running_nemesis = None

# We should disable bootstrap when we create nodes to establish the cluster,
# if we want to add more nodes when the cluster already exists, then we should
Expand Down Expand Up @@ -486,27 +485,6 @@ def system_log(self):
def continuous_events_registry(self) -> ContinuousEventsRegistry:
return self._continuous_events_registry

@property
def running_nemesis(self):
return self._running_nemesis

@running_nemesis.setter
def running_nemesis(self, nemesis):
"""Set name of nemesis which is started on node
Decorators:
running_nemesis.setter
Arguments:
nemesis {str} -- class name of Nemesis
"""

# Only one Nemesis could run on node. Limitation
# of first version for X parallel Nemesis

with self.lock:
self._running_nemesis = nemesis

@cached_property
def distro(self):
self.log.debug("Trying to detect Linux distribution...")
Expand Down Expand Up @@ -4335,8 +4313,9 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
message=f"Failed to rotate AWS KMS key for the '{kms_key_alias_name}' alias",
traceback=traceback.format_exc()).publish()
try:
target_node = [node for node in db_cluster.nodes if not node.running_nemesis][0]
with run_nemesis(node=target_node, nemesis_name="KMS encryption check"):
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.nodes, nemesis_label="KMS encryption check") as target_node:
ks_cf = db_cluster.get_non_system_ks_cf_list(db_node=target_node, filter_out_mv=True)[0]
sstable_util = SstableUtils(db_node=target_node, ks_cf=ks_cf)
encryption_results = sstable_util.is_sstable_encrypted()
Expand Down
12 changes: 5 additions & 7 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
from functools import cached_property
from typing import Dict

from sdcm.nemesis import Nemesis
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.stress.base import format_stress_cmd_error
from sdcm.utils.docker_remote import RemoteDocker
from sdcm.sct_events.system import InfoEvent
from sdcm.sct_events.loaders import KclStressEvent
from sdcm.cluster import BaseNode


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,14 +132,12 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
end_time = time.time() + self._timeout

while not self._stop_event.is_set():
node: BaseNode = self.db_node_to_query(loader)
node.running_nemesis = "Compare tables size by cf-stats"
node.run_nodetool('flush')
with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
node.run_nodetool('flush')

dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
src_size = node.get_cfstats(src_table)['Number of partitions (estimate)']
dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
src_size = node.get_cfstats(src_table)['Number of partitions (estimate)']

node.running_nemesis = None
status = f"== CompareTablesSizesThread: dst table/src table number of partitions: {dst_size}/{src_size} =="
LOGGER.info(status)
InfoEvent(f'[{time.time()}/{end_time}] {status}').publish()
Expand Down
68 changes: 44 additions & 24 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import json
import itertools
from distutils.version import LooseVersion
from contextlib import ExitStack
from contextlib import ExitStack, contextmanager
from typing import Any, List, Optional, Type, Tuple, Callable, Dict, Set, Union, Iterable
from functools import wraps, partial
from collections import defaultdict, Counter, namedtuple
Expand Down Expand Up @@ -162,6 +162,8 @@
"disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node",
)

NEMESIS_TARGET_SELECTION_LOCK = Lock()


class DefaultValue: # pylint: disable=too-few-public-methods
"""
Expand Down Expand Up @@ -292,6 +294,23 @@ def wrapper(self, *args, **kwargs):
setattr(cls, func.__name__, wrapper) # bind it to Nemesis class
return func # returning func means func can still be used normally

@staticmethod
@contextmanager
def run_nemesis(node_list: list['BaseNode'], nemesis_label: str):
"""
pick a node out of a `node_list`, and mark is as running_nemesis
for the duration of this context
"""
with NEMESIS_TARGET_SELECTION_LOCK:
free_nodes = [node for node in node_list if not node.running_nemesis]
node = random.choice(free_nodes)
node.running_nemesis = nemesis_label
try:
yield node
finally:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def use_nemesis_seed(self):
if nemesis_seed := self.tester.params.get("nemesis_seed"):
random.seed(nemesis_seed)
Expand All @@ -318,12 +337,14 @@ def publish_event(self, disrupt, status=True, data=None):
DisruptionEvent(nemesis_name=disrupt, severity=severity, **data).publish()

def set_current_running_nemesis(self, node):
node.running_nemesis = self.current_disruption
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption

@staticmethod
def unset_current_running_nemesis(node):
if node is not None:
node.running_nemesis = None
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def _get_target_nodes(
self,
Expand Down Expand Up @@ -363,21 +384,22 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No
if is_seed is DefaultValue - if self.filter_seed is True it act as if is_seed=False,
otherwise it will act as if is_seed is None
"""
self.unset_current_running_nemesis(self.target_node)
nodes = self._get_target_nodes(is_seed=is_seed, dc_idx=dc_idx, rack=rack)
if not nodes:
dc_str = '' if dc_idx is None else f'dc {dc_idx} '
rack_str = '' if rack is None else f'rack {rack} '
raise UnsupportedNemesis(
f"Can't allocate node from {dc_str}{rack_str}to run nemesis on")
if allow_only_last_node_in_rack:
self.target_node = nodes[-1]
else:
self.target_node = random.choice(nodes)
with NEMESIS_TARGET_SELECTION_LOCK:
self.unset_current_running_nemesis(self.target_node)
nodes = self._get_target_nodes(is_seed=is_seed, dc_idx=dc_idx, rack=rack)
if not nodes:
dc_str = '' if dc_idx is None else f'dc {dc_idx} '
rack_str = '' if rack is None else f'rack {rack} '
raise UnsupportedNemesis(
f"Can't allocate node from {dc_str}{rack_str}to run nemesis on")
if allow_only_last_node_in_rack:
self.target_node = nodes[-1]
else:
self.target_node = random.choice(nodes)

self.set_current_running_nemesis(node=self.target_node)
self.log.info('Current Target: %s with running nemesis: %s',
self.target_node, self.target_node.running_nemesis)
self.target_node.running_nemesis = self.current_disruption
self.log.info('Current Target: %s with running nemesis: %s',
self.target_node, self.target_node.running_nemesis)

@raise_event_on_failure
def run(self, interval=None, cycles_count: int = -1):
Expand Down Expand Up @@ -1107,11 +1129,7 @@ def get_disrupt_name(self):
def get_class_name(self):
return self.__class__.__name__.replace('Monkey', '')

def _set_current_disruption(self, label=None, node=None):
self.target_node = node if node else self.target_node

if not label:
label = "%s on target node %s" % (self.__class__.__name__, self.target_node)
def set_current_disruption(self, label=None):
self.log.debug('Set current_disruption -> %s', label)
self.current_disruption = label

Expand Down Expand Up @@ -5030,8 +5048,10 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements
# NOTE: exclusive nemesis will wait before the end of all other ones
time.sleep(10)

args[0].set_target_node()
args[0].current_disruption = "".join(p.capitalize() for p in method_name.replace("disrupt_", "").split("_"))
args[0].set_current_disruption(f"{args[0].current_disruption}")
args[0].set_target_node()

args[0].cluster.check_cluster_health()
num_nodes_before = len(args[0].cluster.nodes)
start_time = time.time()
Expand All @@ -5042,7 +5062,7 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements
result = None
status = True
# pylint: disable=protected-access
args[0]._set_current_disruption(f"{args[0].current_disruption} {args[0].target_node}")

args[0].set_current_running_nemesis(node=args[0].target_node)
log_info = {
'operation': args[0].current_disruption,
Expand Down
9 changes: 0 additions & 9 deletions sdcm/utils/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,3 @@ def nodetool_context(node, start_command, end_command):
yield result
finally:
node.run_nodetool(end_command)


@contextmanager
def run_nemesis(node: 'BaseNode', nemesis_name: str):
node.running_nemesis = nemesis_name
try:
yield node
finally:
node.running_nemesis = None

0 comments on commit 9e72716

Please sign in to comment.