From 6e5dc1682ac8d50fdf23580ff0a8bc1f97b5c456 Mon Sep 17 00:00:00 2001 From: yarongilor Date: Tue, 8 Oct 2024 20:25:57 +0300 Subject: [PATCH] test(tablets): split and merge on writes and deletions A test of high load, causing rapid tablet splits. Then have a deletion burst and a major compaction to trigger rapid tablets merges. --- .../tablets-split-merge-long-test.jenkinsfile | 12 + .../tablets-split-merge-test.jenkinsfile | 12 + sdcm/provision/scylla_yaml/scylla_yaml.py | 1 + sdcm/utils/sstable/sstable_utils.py | 61 +++++ sdcm/utils/tablets/common.py | 17 ++ tablets_split_merge_test.py | 241 ++++++++++++++++++ .../tablets-split-merge-long-test.yaml | 28 ++ .../tablets/tablets-split-merge-test.yaml | 28 ++ unit_tests/test_scylla_yaml.py | 1 + 9 files changed, 401 insertions(+) create mode 100644 jenkins-pipelines/oss/longevity/tablets-split-merge-long-test.jenkinsfile create mode 100644 jenkins-pipelines/oss/longevity/tablets-split-merge-test.jenkinsfile create mode 100644 tablets_split_merge_test.py create mode 100644 test-cases/features/tablets/tablets-split-merge-long-test.yaml create mode 100644 test-cases/features/tablets/tablets-split-merge-test.yaml diff --git a/jenkins-pipelines/oss/longevity/tablets-split-merge-long-test.jenkinsfile b/jenkins-pipelines/oss/longevity/tablets-split-merge-long-test.jenkinsfile new file mode 100644 index 00000000000..7e4902aede0 --- /dev/null +++ b/jenkins-pipelines/oss/longevity/tablets-split-merge-long-test.jenkinsfile @@ -0,0 +1,12 @@ +#!groovy + +// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43 +def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm) + +longevityPipeline( + backend: 'aws', + region: 'eu-west-1', + test_name: 'tablets_split_merge_test.TabletsSplitMergeTest.test_tablets_split_merge', + test_config: 'test-cases/features/tablets/tablets-split-merge-long-test.yaml' + +) diff --git a/jenkins-pipelines/oss/longevity/tablets-split-merge-test.jenkinsfile b/jenkins-pipelines/oss/longevity/tablets-split-merge-test.jenkinsfile new file mode 100644 index 00000000000..94d829d9913 --- /dev/null +++ b/jenkins-pipelines/oss/longevity/tablets-split-merge-test.jenkinsfile @@ -0,0 +1,12 @@ +#!groovy + +// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43 +def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm) + +longevityPipeline( + backend: 'aws', + region: 'eu-west-1', + test_name: 'tablets_split_merge_test.TabletsSplitMergeTest.test_tablets_split_merge', + test_config: 'test-cases/features/tablets/tablets-split-merge-test.yaml' + +) diff --git a/sdcm/provision/scylla_yaml/scylla_yaml.py b/sdcm/provision/scylla_yaml/scylla_yaml.py index 4d3845395e2..0bf2125d16c 100644 --- a/sdcm/provision/scylla_yaml/scylla_yaml.py +++ b/sdcm/provision/scylla_yaml/scylla_yaml.py @@ -353,6 +353,7 @@ def set_authorizer(cls, authorizer: str): compaction_collection_items_count_warning_threshold: int = None # None enable_tablets: bool = None # False, but default scylla.yaml for some versions (e.g. 6.0) override it to True + target_tablet_size_in_bytes: int = None force_gossip_topology_changes: bool = None # False reader_concurrency_semaphore_cpu_concurrency: int = None diff --git a/sdcm/utils/sstable/sstable_utils.py b/sdcm/utils/sstable/sstable_utils.py index c0fcc75ab69..ef86a87239b 100644 --- a/sdcm/utils/sstable/sstable_utils.py +++ b/sdcm/utils/sstable/sstable_utils.py @@ -41,6 +41,20 @@ def count_tombstones(self): self.log.debug('Got %s tombstones for %s', tombstones_num, self.ks_cf) return tombstones_num + def get_cf_dir_files(self): + files = [] + ks_cf_path = self.ks_cf.replace('.', '/') + find_cmd = f"find /var/lib/scylla/data/{ks_cf_path}-*/* -maxdepth 1 -type f" + files_res = self.db_node.remoter.sudo(find_cmd, verbose=True, ignore_status=True) + if files_res.stderr: + self.log.debug('Failed to get files for %s. Error: %s', self.ks_cf, files_res.stderr) + else: + files = files_res.stdout.split() + + self.log.debug('Got %s files', len(files)) + self.log.debug(f"The files are: {files}") + return files + def get_sstables(self, from_minutes_ago: int = 0): selected_sstables = [] ks_cf_path = self.ks_cf.replace('.', '/') @@ -221,3 +235,50 @@ def get_tombstone_date(tombstone_deletion_info: str) -> datetime.datetime: full_deletion_date = f'{deletion_date} {deletion_hour}:{deletion_minutes}:{deletion_seconds}' full_deletion_date_datetime = datetime.datetime.strptime(full_deletion_date, '%Y-%m-%d %H:%M:%S') return full_deletion_date_datetime + + def _get_file_sizes_in_mb(self, list_files): + """ + Calculate the size of the given SSTable files in MB on the data node. + :param list_files: List of files. + :return: List of file sizes in MB. + """ + self.log.debug(f"Getting sizes for files: {list_files}") + list_files_size = [] + + for file_path in list_files: + try: + # Use the 'stat' command on the remote node to get the file size in bytes + stat_cmd = f"stat -c%s {file_path}" + stat_res = self.db_node.remoter.sudo(stat_cmd, verbose=False, ignore_status=True) + + if stat_res.ok: + # Parse the output and convert bytes to MB + file_size_in_bytes = int(stat_res.stdout.strip()) + file_size_in_mb = file_size_in_bytes >> 20 + list_files_size.append(file_size_in_mb) + else: + self.log.error(f"Failed to get size for {file_path}: {stat_res.stderr}") + except Exception as e: # noqa: BLE001 + self.log.error(f"Error retrieving size for {file_path}: {e}") + + self.log.debug(f"Got {len(list_files_size)} file sizes") + self.log.debug(f"The file sizes are: {list_files_size}") + return list_files_size + + def get_cf_dir_files_and_sizes(self): + """ + Get all files in the keyspace-table directory and their sizes in MB. + :return: Tuple (list_of_files, list_of_sizes_in_mb) + """ + self.log.debug(f"Fetching all files and their sizes from the {self.ks_cf} directory.") + + # Fetch all files + files = self.get_cf_dir_files() + if not files: + self.log.warning("No files found in the keyspace-table directory.") + return [], [] + + file_sizes = self._get_file_sizes_in_mb(files) + self.log.debug(f"Files: {files}, Sizes: {file_sizes}") + + return files, file_sizes diff --git a/sdcm/utils/tablets/common.py b/sdcm/utils/tablets/common.py index 778f0b5cdca..f2d0e2a0330 100644 --- a/sdcm/utils/tablets/common.py +++ b/sdcm/utils/tablets/common.py @@ -43,3 +43,20 @@ def wait_for_tablets_balanced(node): params={}, timeout=3600, retry=3) time.sleep(5) LOGGER.info("Tablets are balanced") + + +def get_tablets_count(keyspace: str, session) -> int: + """ + Returns the number of tablets in a keyspace. + :param session: + :param keyspace: + :return: tablets count int + """ + cmd = f"select tablet_count from system.tablets where keyspace_name='{keyspace}' ALLOW FILTERING" + cql_result = session.execute(cmd) + if not cql_result or not cql_result.current_rows: + return 0 + LOGGER.debug(f"Got CQL result of: {cql_result.current_rows}") + tablet_count = cql_result.current_rows[0].tablet_count + LOGGER.debug('Retrieved a tablets count of: %s for keyspace %s', tablet_count, keyspace) + return int(tablet_count) diff --git a/tablets_split_merge_test.py b/tablets_split_merge_test.py new file mode 100644 index 00000000000..a6204c67523 --- /dev/null +++ b/tablets_split_merge_test.py @@ -0,0 +1,241 @@ +import threading +import time +from functools import partial + +from cassandra import ConsistencyLevel +from cassandra.query import SimpleStatement + +from longevity_test import LongevityTest +from sdcm import wait +from sdcm.sct_events import Severity +from sdcm.sct_events.system import InfoEvent +from sdcm.utils.common import ParallelObject +from sdcm.utils.sstable.sstable_utils import SstableUtils +from sdcm.utils.tablets.common import get_tablets_count +from test_lib.scylla_bench_tools import create_scylla_bench_table_query + +KEYSPACE_NAME = 'scylla_bench' +TABLE_NAME = 'test' +KS_CF = f"{KEYSPACE_NAME}.{TABLE_NAME}" +GB_IN_BYTES = 1_073_741_824 + + +class TabletsSplitMergeTest(LongevityTest): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.max_tablets_num = 0 + self.query_tablets_num_stop_event = threading.Event() + self.tablets_num_lock = threading.Lock() # Lock to protect shared variables + + def _background_query_tablets_num(self): + """Background thread to query the tablets number.""" + while not self.query_tablets_num_stop_event.is_set(): + try: + tablets_num = self._get_tablets_number() + with self.tablets_num_lock: + self.max_tablets_num = max(self.max_tablets_num, tablets_num) + self.log.debug(f"Updated max tablets number: {self.max_tablets_num}") + except Exception as e: # pylint: disable=broad-except # noqa: BLE001 + self.log.error(f"Error in background tablet query: {e}") + time.sleep(15) # Wait 15 seconds before next query + + def _get_tablets_number(self) -> int: + with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session: + tablets_number = get_tablets_count(keyspace=KEYSPACE_NAME, session=session) + self.log.debug(f"Tablets number for {KEYSPACE_NAME} is: {tablets_number}") + return tablets_number + + def _update_target_tablet_size(self, target_tablet_size_in_bytes: int): + for node in self.db_cluster.nodes: + self.log.info(f"Updating {node} with new target_tablet_size_in_bytes = {target_tablet_size_in_bytes}") + append_scylla_yaml = {"target_tablet_size_in_bytes": target_tablet_size_in_bytes} + with node.remote_scylla_yaml() as scylla_yaml: + scylla_yaml.update(append_scylla_yaml) + self.log.debug(f"Restarting node {node} to apply new target_tablet_size_in_bytes") + node.restart_scylla_server() + + def _pre_create_large_partitions_schema(self): + self.run_pre_create_keyspace() + with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session: + session.execute(create_scylla_bench_table_query()) + + def _start_stress_if_not_running(self, stress_queue, stress_params): + if self._is_stress_finished(stress_queue=stress_queue): + InfoEvent(message="The Stress is not running. Rerunning it.").publish() + self._run_all_stress_cmds(stress_queue, stress_params) + + def test_tablets_split_merge(self): # pylint: disable=too-many-locals # noqa: PLR0914 + """ + (1) writing initial schema and get initial number of tablets. + (2) start write and read stress simultaneously. + (3) wait for tablets split. + (4) wait for stress completion. + (5) run deletions, nodetool flush and major compaction. + (6) wait for tablets merge, following a shrunk dataset size. + (7) redo more such cycles. + """ + + # (1) initiate schema. + InfoEvent(message=f"Create a keyspace with tablets initial value").publish() + self._pre_create_large_partitions_schema() + stress_cmd = self.params.get('stress_cmd') + stress_read_cmd = self.params.get('stress_read_cmd') + deletion_percentage = 96 # How many of dataset partitions should be deleted. + + # Run prepare stress + self.run_prepare_write_cmd() + + # Run Read background stress + stress_read_queue = [] + read_params = {'keyspace_num': 1, 'stress_cmd': stress_read_cmd} + self._run_all_stress_cmds(stress_read_queue, read_params) + self.node1 = self.db_cluster.nodes[0] + sstable_utils = SstableUtils(db_node=self.node1, ks_cf=KS_CF) + InfoEvent(message=f"Starting background thread to query and set max tablets number.").publish() + background_thread = threading.Thread(target=self._background_query_tablets_num, daemon=True) + background_thread.start() + # Run cycles of writes and deletions + cycles_num = 7 + for cycle in range(cycles_num): + with self.tablets_num_lock: + initial_tablets_num = self.max_tablets_num = self._get_tablets_number() + InfoEvent( + message=f"Cycle {cycle }, Initial tablets number before stress is: {initial_tablets_num}").publish() + InfoEvent(message=f"Starting write load: {stress_cmd}").publish() + stress_queue = [] + self.assemble_and_run_all_stress_cmd(stress_queue=stress_queue, stress_cmd=stress_cmd, keyspace_num=1) + + self._start_stress_if_not_running(stress_read_queue, read_params) + + InfoEvent(message="Wait for write stress to finish (if running)").publish() + for stress in stress_queue: + self.verify_stress_thread(cs_thread_pool=stress) + + self._start_stress_if_not_running(stress_read_queue, read_params) + InfoEvent(message=f"Start deletions to trigger a tablets merge.").publish() + self.delete_partitions_in_batch(deletion_percentage) + + InfoEvent(message=f"Run a flush for {KEYSPACE_NAME} on nodes").publish() + triggers = [partial(node.run_nodetool, sub_cmd=f"flush -- {KEYSPACE_NAME}", ) + for node in self.db_cluster.data_nodes] + ParallelObject(objects=triggers, timeout=2400).call_objects() + + InfoEvent( + message=f"Checking sstable sizes of {sstable_utils.ks_cf} before major compaction").publish() + sstable_utils.get_cf_dir_files_and_sizes() + InfoEvent(message=f"Run a major compaction for {KEYSPACE_NAME} on nodes").publish() + triggers = [partial(node.run_nodetool, sub_cmd="compact", args=f"{KEYSPACE_NAME} {TABLE_NAME}", ) for + node in self.db_cluster.data_nodes] + ParallelObject(objects=triggers, timeout=2400).call_objects() + + InfoEvent(message=f"Checking sstable sizes of {sstable_utils.ks_cf} after major compaction").publish() + sstable_utils.get_cf_dir_files_and_sizes() + self._wait_for_tablet_split(tablets_num=initial_tablets_num) + self.log.debug(f"Final max tablets number: {self.max_tablets_num}") + + if self.max_tablets_num <= initial_tablets_num: + InfoEvent( + message=f"The maximum number of tablets [{self.max_tablets_num}] is not bigger than the initial number [{initial_tablets_num}].", severity=Severity.ERROR).publish() + InfoEvent(message="Wait for tablets merge after deletion is done.").publish() + self._wait_for_tablets_merge() + + self.log.debug("Stopping background thread.") + self.query_tablets_num_stop_event.set() + background_thread.join() + + InfoEvent(message="Wait for read stress to finish.").publish() + + for stress in stress_read_queue: + self.verify_stress_thread(cs_thread_pool=stress) + + def _is_stress_finished(self, stress_queue) -> bool: + for index, stress in enumerate(stress_queue): + self.log.debug(f"Checking stress task {index + 1}/{len(stress_queue)}: {stress.stress_cmd}") + if not all(future.done() for future in stress.results_futures): + self.log.debug(f"Stress task {index + 1} is still running.") + return False + self.log.debug(f"Stress task {index + 1} has completed.") + self.log.debug("All stress tasks have finished.") + return True + + def _wait_for_tablets_merge(self, timeout_min: int = 15): + """ + Waits for a tablets number smaller than tablets_num + """ + text = f"Waiting for a smaller tablets number" + res = wait.wait_for(func=lambda: self._get_tablets_number() < self.max_tablets_num, step=60, + text=text, timeout=60 * timeout_min, throw_exc=False) + if not res: + InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish() + + def _wait_for_tablet_split(self, tablets_num: int, timeout_min: int = 15): + """ + Waits for tablets number bigger than tablets_num + """ + text = f"Waiting for a tablets number bigger than {tablets_num}" + res = wait.wait_for(func=lambda: self.max_tablets_num > tablets_num, step=60, + text=text, timeout=60 * timeout_min, throw_exc=False) + if not res: + InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish() + + def delete_partitions(self, deletion_percentage: int): + """ + Deletes a percentage of table's partitions. + Example: + partition_end_range = 10 + max_partitions_to_delete = 100 + deletion_percentage = 70 + ==> 70 partitions from 11 up to 81 will be deleted. + """ + start_partition = self.partitions_attrs.partition_end_range + 1 + max_partitions_to_delete = self.partitions_attrs.max_partitions_in_test_table - self.partitions_attrs.partition_end_range + num_of_partitions_to_delete = int(max_partitions_to_delete * deletion_percentage / 100) + end_partition = start_partition + num_of_partitions_to_delete + + self.log.debug( + f"Preparing to delete {num_of_partitions_to_delete} partitions from {start_partition} to {end_partition}.") + for pkey in range(start_partition, end_partition): + delete_query = f"delete from {KS_CF} where pk = {pkey}" + self.log.debug(f'delete query: {delete_query}') + try: + with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session: + session.execute(SimpleStatement(delete_query, consistency_level=ConsistencyLevel.QUORUM), + timeout=3600) + except Exception as e: # pylint: disable=broad-except # noqa: BLE001 + message = f"Failed to execute delete: {e}" + InfoEvent(message=message, severity=Severity.ERROR).publish() + + def delete_partitions_in_batch(self, deletion_percentage: int): + """ + Deletes a percentage of table's partitions using batched queries. + """ + start_partition = self.partitions_attrs.partition_end_range + 1 + max_partitions_to_delete = self.partitions_attrs.max_partitions_in_test_table - self.partitions_attrs.partition_end_range + num_of_partitions_to_delete = int(max_partitions_to_delete * deletion_percentage / 100) + end_partition = start_partition + num_of_partitions_to_delete + batch_size: int = 1000 + self.log.debug( + f"Preparing to delete {num_of_partitions_to_delete} partitions from {start_partition} to {end_partition} in batches of {batch_size}.") + batch = [] + try: + with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session: + for pkey in range(start_partition, end_partition): + delete_query = f"DELETE FROM {KS_CF} WHERE pk = {pkey}" + batch.append(delete_query) + + # Execute batch when it reaches the batch size + if len(batch) >= batch_size: + batch_query = "BEGIN BATCH\n" + "\n".join(batch) + "\nAPPLY BATCH;" + session.execute(SimpleStatement( + batch_query, consistency_level=ConsistencyLevel.QUORUM), timeout=3600) + batch.clear() # Clear batch after execution + + # Execute any remaining queries in the batch + if batch: + batch_query = "BEGIN BATCH\n" + "\n".join(batch) + "\nAPPLY BATCH;" + session.execute(SimpleStatement( + batch_query, consistency_level=ConsistencyLevel.QUORUM), timeout=3600) + except Exception as e: # pylint: disable=broad-except # noqa: BLE001 + message = f"Failed to execute batch delete: {e}" + InfoEvent(message=message, severity=Severity.ERROR).publish() diff --git a/test-cases/features/tablets/tablets-split-merge-long-test.yaml b/test-cases/features/tablets/tablets-split-merge-long-test.yaml new file mode 100644 index 00000000000..02ebc507ccf --- /dev/null +++ b/test-cases/features/tablets/tablets-split-merge-long-test.yaml @@ -0,0 +1,28 @@ +test_duration: 3000 + +prepare_write_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=100 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=80 -connection-count=80 -consistency-level=quorum -rows-per-request=50 -timeout=30s -validate-data" +stress_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=150000 -clustering-row-count=5555 -partition-offset=101 -clustering-row-size=1884 -concurrency=80 -connection-count=80 -consistency-level=quorum -rows-per-request=50 -timeout=30s" +stress_read_cmd: "scylla-bench -workload=sequential -mode=read -replication-factor=3 -partition-count=100 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=40 -connection-count=40 -consistency-level=quorum -rows-per-request=1 -timeout=30s -retry-number=30 -retry-interval=80ms,1s -iterations 0 -duration=30m" + +pre_create_keyspace: "CREATE KEYSPACE IF NOT EXISTS scylla_bench WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} and tablets={'initial':64};" + +data_validation: | + validate_partitions: true + table_name: "scylla_bench.test" + primary_key_column: "pk" + max_partitions_in_test_table: 150100 + partition_range_with_data_validation: 0-100 + +n_db_nodes: 3 +n_loaders: 1 +n_monitor_nodes: 1 +instance_type_db: 'i3.4xlarge' + +append_scylla_yaml: + target_tablet_size_in_bytes: 268435456 # 250MB + enable_tablets: true + +user_prefix: 'tablets-split-merge' +use_mgmt: false + +post_prepare_cql_cmds: "ALTER TABLE scylla_bench.test with gc_grace_seconds = 0" diff --git a/test-cases/features/tablets/tablets-split-merge-test.yaml b/test-cases/features/tablets/tablets-split-merge-test.yaml new file mode 100644 index 00000000000..65fdab96156 --- /dev/null +++ b/test-cases/features/tablets/tablets-split-merge-test.yaml @@ -0,0 +1,28 @@ +test_duration: 240 + +prepare_write_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=100 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=80 -connection-count=80 -consistency-level=quorum -rows-per-request=50 -timeout=30s -validate-data" +stress_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=10000 -clustering-row-count=5555 -partition-offset=101 -clustering-row-size=1884 -concurrency=80 -connection-count=80 -consistency-level=quorum -rows-per-request=50 -timeout=30s" +stress_read_cmd: "scylla-bench -workload=sequential -mode=read -replication-factor=3 -partition-count=100 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=40 -connection-count=40 -consistency-level=quorum -rows-per-request=1 -timeout=30s -retry-number=30 -retry-interval=80ms,1s -iterations 0 -duration=30m" + +pre_create_keyspace: "CREATE KEYSPACE IF NOT EXISTS scylla_bench WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} and tablets={'initial':64};" + +data_validation: | + validate_partitions: true + table_name: "scylla_bench.test" + primary_key_column: "pk" + max_partitions_in_test_table: 10100 + partition_range_with_data_validation: 0-100 + +n_db_nodes: 3 +n_loaders: 1 +n_monitor_nodes: 1 +instance_type_db: 'i3.8xlarge' + +append_scylla_yaml: + target_tablet_size_in_bytes: 536870912 # 0.5GB + enable_tablets: true + +user_prefix: 'tablets-split-merge' +use_mgmt: false + +post_prepare_cql_cmds: "ALTER TABLE scylla_bench.test with gc_grace_seconds = 0" diff --git a/unit_tests/test_scylla_yaml.py b/unit_tests/test_scylla_yaml.py index 66119fcef4b..ecec0643a68 100644 --- a/unit_tests/test_scylla_yaml.py +++ b/unit_tests/test_scylla_yaml.py @@ -407,6 +407,7 @@ def test_scylla_yaml(self): 'workdir': None, 'write_request_timeout_in_ms': None, 'enable_tablets': None, + 'target_tablet_size_in_bytes': None, 'force_gossip_topology_changes': None, 'reader_concurrency_semaphore_cpu_concurrency': None, }