Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2023.1] feature(nemesis): introduce lock for target selection #8491

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 7 additions & 28 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import itertools
import json
import ipaddress

from importlib import import_module
from typing import List, Optional, Dict, Union, Set, Iterable, ContextManager, Any, IO, AnyStr
from datetime import datetime
from textwrap import dedent
Expand All @@ -52,7 +52,6 @@
from cassandra.policies import RetryPolicy
from cassandra.policies import WhiteListRoundRobinPolicy, HostFilterPolicy, RoundRobinPolicy
from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module

from argus.backend.util.enums import ResourceState
from sdcm.node_exporter_setup import NodeExporterSetup, SyslogNgExporterSetup
from sdcm.db_log_reader import DbLogReader
Expand Down Expand Up @@ -153,7 +152,6 @@
NodeNotReady,
SstablesNotFound,
)
from sdcm.utils.context_managers import run_nemesis

# Test duration (min). Parameter used to keep instances produced by tests that
# are supposed to run longer than 24 hours from being killed
Expand Down Expand Up @@ -290,7 +288,7 @@ def __init__(self, name, parent_cluster, ssh_login_info=None, base_logdir=None,
self.stop_wait_db_up_event = threading.Event()
self.lock = threading.Lock()

self._running_nemesis = None
self.running_nemesis = None

# We should disable bootstrap when we create nodes to establish the cluster,
# if we want to add more nodes when the cluster already exists, then we should
Expand Down Expand Up @@ -548,27 +546,6 @@ def system_log(self):
def continuous_events_registry(self) -> ContinuousEventsRegistry:
return self._continuous_events_registry

@property
def running_nemesis(self):
return self._running_nemesis

@running_nemesis.setter
def running_nemesis(self, nemesis):
"""Set name of nemesis which is started on node

Decorators:
running_nemesis.setter

Arguments:
nemesis {str} -- class name of Nemesis
"""

# Only one Nemesis could run on node. Limitation
# of first version for X parallel Nemesis

with self.lock:
self._running_nemesis = nemesis

@cached_property
def distro(self):
self.log.debug("Trying to detect Linux distribution...")
Expand Down Expand Up @@ -4498,9 +4475,11 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
self.log.warning("KMS encryption check is skipped due to the 'scylla-enterprise/issues/3896'")
continue
try:
target_node = [node for node in db_cluster.nodes if not node.running_nemesis][0]
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)
with run_nemesis(node=target_node, nemesis_name="KMS encryption check"):
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.nodes, nemesis_label="KMS encryption check") as target_node:
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)

ks_cf_list = db_cluster.get_non_system_ks_cf_list(
db_node=target_node, filter_out_table_with_counter=True, filter_out_mv=True,
filter_empty_tables=True)
Expand Down
12 changes: 5 additions & 7 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
from functools import cached_property
from typing import Dict

from sdcm.nemesis import Nemesis
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.stress.base import format_stress_cmd_error
from sdcm.utils.docker_remote import RemoteDocker
from sdcm.sct_events.system import InfoEvent
from sdcm.sct_events.loaders import KclStressEvent
from sdcm.cluster import BaseNode


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -132,14 +132,12 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
end_time = time.time() + self._timeout

while not self._stop_event.is_set():
node: BaseNode = self.db_node_to_query(loader)
node.running_nemesis = "Compare tables size by cf-stats"
node.run_nodetool('flush')
with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
node.run_nodetool('flush')

dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
src_size = node.get_cfstats(src_table)['Number of partitions (estimate)']
dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
src_size = node.get_cfstats(src_table)['Number of partitions (estimate)']

node.running_nemesis = None
status = f"== CompareTablesSizesThread: dst table/src table number of partitions: {dst_size}/{src_size} =="
LOGGER.info(status)
InfoEvent(f'[{time.time()}/{end_time}] {status}').publish()
Expand Down
68 changes: 44 additions & 24 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import json
import itertools
from distutils.version import LooseVersion
from contextlib import ExitStack
from contextlib import ExitStack, contextmanager
from typing import Any, List, Optional, Type, Tuple, Callable, Dict, Set, Union, Iterable
from functools import wraps, partial
from collections import defaultdict, Counter, namedtuple
Expand Down Expand Up @@ -164,6 +164,8 @@
"disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node",
)

NEMESIS_TARGET_SELECTION_LOCK = Lock()


class DefaultValue: # pylint: disable=too-few-public-methods
"""
Expand Down Expand Up @@ -294,6 +296,24 @@ def wrapper(self, *args, **kwargs):
setattr(cls, func.__name__, wrapper) # bind it to Nemesis class
return func # returning func means func can still be used normally

@staticmethod
@contextmanager
def run_nemesis(node_list: list['BaseNode'], nemesis_label: str):
"""
pick a node out of a `node_list`, and mark is as running_nemesis
for the duration of this context
"""
with NEMESIS_TARGET_SELECTION_LOCK:
free_nodes = [node for node in node_list if not node.running_nemesis]
assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?"
node = random.choice(free_nodes)
node.running_nemesis = nemesis_label
try:
yield node
finally:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def use_nemesis_seed(self):
if nemesis_seed := self.tester.params.get("nemesis_seed"):
random.seed(nemesis_seed)
Expand All @@ -320,12 +340,14 @@ def publish_event(self, disrupt, status=True, data=None):
DisruptionEvent(nemesis_name=disrupt, severity=severity, **data).publish()

def set_current_running_nemesis(self, node):
node.running_nemesis = self.current_disruption
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption

@staticmethod
def unset_current_running_nemesis(node):
if node is not None:
node.running_nemesis = None
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def _get_target_nodes(
self,
Expand Down Expand Up @@ -365,21 +387,21 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No
if is_seed is DefaultValue - if self.filter_seed is True it act as if is_seed=False,
otherwise it will act as if is_seed is None
"""
self.unset_current_running_nemesis(self.target_node)
nodes = self._get_target_nodes(is_seed=is_seed, dc_idx=dc_idx, rack=rack)
if not nodes:
dc_str = '' if dc_idx is None else f'dc {dc_idx} '
rack_str = '' if rack is None else f'rack {rack} '
raise UnsupportedNemesis(
f"Can't allocate node from {dc_str}{rack_str}to run nemesis on")
if allow_only_last_node_in_rack:
self.target_node = nodes[-1]
else:
self.target_node = random.choice(nodes)
with NEMESIS_TARGET_SELECTION_LOCK:
nodes = self._get_target_nodes(is_seed=is_seed, dc_idx=dc_idx, rack=rack)
if not nodes:
dc_str = '' if dc_idx is None else f'dc {dc_idx} '
rack_str = '' if rack is None else f'rack {rack} '
raise UnsupportedNemesis(
f"Can't allocate node from {dc_str}{rack_str}to run nemesis on")
if allow_only_last_node_in_rack:
self.target_node = nodes[-1]
else:
self.target_node = random.choice(nodes)

self.set_current_running_nemesis(node=self.target_node)
self.log.info('Current Target: %s with running nemesis: %s',
self.target_node, self.target_node.running_nemesis)
self.target_node.running_nemesis = self.current_disruption
self.log.info('Current Target: %s with running nemesis: %s',
self.target_node, self.target_node.running_nemesis)

@raise_event_on_failure
def run(self, interval=None, cycles_count: int = -1):
Expand Down Expand Up @@ -1110,11 +1132,7 @@ def get_disrupt_name(self):
def get_class_name(self):
return self.__class__.__name__.replace('Monkey', '')

def _set_current_disruption(self, label=None, node=None):
self.target_node = node if node else self.target_node

if not label:
label = "%s on target node %s" % (self.__class__.__name__, self.target_node)
def set_current_disruption(self, label=None):
self.log.debug('Set current_disruption -> %s', label)
self.current_disruption = label

Expand Down Expand Up @@ -5062,8 +5080,10 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements
# NOTE: exclusive nemesis will wait before the end of all other ones
time.sleep(10)

args[0].set_target_node()
args[0].current_disruption = "".join(p.capitalize() for p in method_name.replace("disrupt_", "").split("_"))
args[0].set_current_disruption(f"{args[0].current_disruption}")
args[0].set_target_node()

args[0].cluster.check_cluster_health()
num_nodes_before = len(args[0].cluster.nodes)
start_time = time.time()
Expand All @@ -5074,7 +5094,7 @@ def wrapper(*args, **kwargs): # pylint: disable=too-many-statements
result = None
status = True
# pylint: disable=protected-access
args[0]._set_current_disruption(f"{args[0].current_disruption} {args[0].target_node}")

args[0].set_current_running_nemesis(node=args[0].target_node)
log_info = {
'operation': args[0].current_disruption,
Expand Down
9 changes: 0 additions & 9 deletions sdcm/utils/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,3 @@ def nodetool_context(node, start_command, end_command):
yield result
finally:
node.run_nodetool(end_command)


@contextmanager
def run_nemesis(node: 'BaseNode', nemesis_name: str):
node.running_nemesis = nemesis_name
try:
yield node
finally:
node.running_nemesis = None
Loading