Skip to content

Commit

Permalink
test(tablets): split and merge on writes and deletions
Browse files Browse the repository at this point in the history
A test of high load, causing rapid tablet splits.
Then have a deletion burst and a major compaction to trigger rapid tablets merges.
  • Loading branch information
yarongilor committed Dec 3, 2024
1 parent 63b10fb commit 6e5dc16
Show file tree
Hide file tree
Showing 9 changed files with 401 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!groovy

// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)

longevityPipeline(
backend: 'aws',
region: 'eu-west-1',
test_name: 'tablets_split_merge_test.TabletsSplitMergeTest.test_tablets_split_merge',
test_config: 'test-cases/features/tablets/tablets-split-merge-long-test.yaml'

)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!groovy

// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)

longevityPipeline(
backend: 'aws',
region: 'eu-west-1',
test_name: 'tablets_split_merge_test.TabletsSplitMergeTest.test_tablets_split_merge',
test_config: 'test-cases/features/tablets/tablets-split-merge-test.yaml'

)
1 change: 1 addition & 0 deletions sdcm/provision/scylla_yaml/scylla_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def set_authorizer(cls, authorizer: str):
compaction_collection_items_count_warning_threshold: int = None # None

enable_tablets: bool = None # False, but default scylla.yaml for some versions (e.g. 6.0) override it to True
target_tablet_size_in_bytes: int = None
force_gossip_topology_changes: bool = None # False

reader_concurrency_semaphore_cpu_concurrency: int = None
Expand Down
61 changes: 61 additions & 0 deletions sdcm/utils/sstable/sstable_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ def count_tombstones(self):
self.log.debug('Got %s tombstones for %s', tombstones_num, self.ks_cf)
return tombstones_num

def get_cf_dir_files(self):
files = []
ks_cf_path = self.ks_cf.replace('.', '/')
find_cmd = f"find /var/lib/scylla/data/{ks_cf_path}-*/* -maxdepth 1 -type f"
files_res = self.db_node.remoter.sudo(find_cmd, verbose=True, ignore_status=True)
if files_res.stderr:
self.log.debug('Failed to get files for %s. Error: %s', self.ks_cf, files_res.stderr)
else:
files = files_res.stdout.split()

self.log.debug('Got %s files', len(files))
self.log.debug(f"The files are: {files}")
return files

def get_sstables(self, from_minutes_ago: int = 0):
selected_sstables = []
ks_cf_path = self.ks_cf.replace('.', '/')
Expand Down Expand Up @@ -221,3 +235,50 @@ def get_tombstone_date(tombstone_deletion_info: str) -> datetime.datetime:
full_deletion_date = f'{deletion_date} {deletion_hour}:{deletion_minutes}:{deletion_seconds}'
full_deletion_date_datetime = datetime.datetime.strptime(full_deletion_date, '%Y-%m-%d %H:%M:%S')
return full_deletion_date_datetime

def _get_file_sizes_in_mb(self, list_files):
"""
Calculate the size of the given SSTable files in MB on the data node.
:param list_files: List of files.
:return: List of file sizes in MB.
"""
self.log.debug(f"Getting sizes for files: {list_files}")
list_files_size = []

for file_path in list_files:
try:
# Use the 'stat' command on the remote node to get the file size in bytes
stat_cmd = f"stat -c%s {file_path}"
stat_res = self.db_node.remoter.sudo(stat_cmd, verbose=False, ignore_status=True)

if stat_res.ok:
# Parse the output and convert bytes to MB
file_size_in_bytes = int(stat_res.stdout.strip())
file_size_in_mb = file_size_in_bytes >> 20
list_files_size.append(file_size_in_mb)
else:
self.log.error(f"Failed to get size for {file_path}: {stat_res.stderr}")
except Exception as e: # noqa: BLE001
self.log.error(f"Error retrieving size for {file_path}: {e}")

self.log.debug(f"Got {len(list_files_size)} file sizes")
self.log.debug(f"The file sizes are: {list_files_size}")
return list_files_size

def get_cf_dir_files_and_sizes(self):
"""
Get all files in the keyspace-table directory and their sizes in MB.
:return: Tuple (list_of_files, list_of_sizes_in_mb)
"""
self.log.debug(f"Fetching all files and their sizes from the {self.ks_cf} directory.")

# Fetch all files
files = self.get_cf_dir_files()
if not files:
self.log.warning("No files found in the keyspace-table directory.")
return [], []

file_sizes = self._get_file_sizes_in_mb(files)
self.log.debug(f"Files: {files}, Sizes: {file_sizes}")

return files, file_sizes
17 changes: 17 additions & 0 deletions sdcm/utils/tablets/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,20 @@ def wait_for_tablets_balanced(node):
params={}, timeout=3600, retry=3)
time.sleep(5)
LOGGER.info("Tablets are balanced")


def get_tablets_count(keyspace: str, session) -> int:
"""
Returns the number of tablets in a keyspace.
:param session:
:param keyspace:
:return: tablets count int
"""
cmd = f"select tablet_count from system.tablets where keyspace_name='{keyspace}' ALLOW FILTERING"
cql_result = session.execute(cmd)
if not cql_result or not cql_result.current_rows:
return 0
LOGGER.debug(f"Got CQL result of: {cql_result.current_rows}")
tablet_count = cql_result.current_rows[0].tablet_count
LOGGER.debug('Retrieved a tablets count of: %s for keyspace %s', tablet_count, keyspace)
return int(tablet_count)
241 changes: 241 additions & 0 deletions tablets_split_merge_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import threading
import time
from functools import partial

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement

from longevity_test import LongevityTest
from sdcm import wait
from sdcm.sct_events import Severity
from sdcm.sct_events.system import InfoEvent
from sdcm.utils.common import ParallelObject
from sdcm.utils.sstable.sstable_utils import SstableUtils
from sdcm.utils.tablets.common import get_tablets_count
from test_lib.scylla_bench_tools import create_scylla_bench_table_query

KEYSPACE_NAME = 'scylla_bench'
TABLE_NAME = 'test'
KS_CF = f"{KEYSPACE_NAME}.{TABLE_NAME}"
GB_IN_BYTES = 1_073_741_824


class TabletsSplitMergeTest(LongevityTest):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_tablets_num = 0
self.query_tablets_num_stop_event = threading.Event()
self.tablets_num_lock = threading.Lock() # Lock to protect shared variables

def _background_query_tablets_num(self):
"""Background thread to query the tablets number."""
while not self.query_tablets_num_stop_event.is_set():
try:
tablets_num = self._get_tablets_number()
with self.tablets_num_lock:
self.max_tablets_num = max(self.max_tablets_num, tablets_num)
self.log.debug(f"Updated max tablets number: {self.max_tablets_num}")
except Exception as e: # pylint: disable=broad-except # noqa: BLE001
self.log.error(f"Error in background tablet query: {e}")
time.sleep(15) # Wait 15 seconds before next query

def _get_tablets_number(self) -> int:
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session:
tablets_number = get_tablets_count(keyspace=KEYSPACE_NAME, session=session)
self.log.debug(f"Tablets number for {KEYSPACE_NAME} is: {tablets_number}")
return tablets_number

def _update_target_tablet_size(self, target_tablet_size_in_bytes: int):
for node in self.db_cluster.nodes:
self.log.info(f"Updating {node} with new target_tablet_size_in_bytes = {target_tablet_size_in_bytes}")
append_scylla_yaml = {"target_tablet_size_in_bytes": target_tablet_size_in_bytes}
with node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.update(append_scylla_yaml)
self.log.debug(f"Restarting node {node} to apply new target_tablet_size_in_bytes")
node.restart_scylla_server()

def _pre_create_large_partitions_schema(self):
self.run_pre_create_keyspace()
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session:
session.execute(create_scylla_bench_table_query())

def _start_stress_if_not_running(self, stress_queue, stress_params):
if self._is_stress_finished(stress_queue=stress_queue):
InfoEvent(message="The Stress is not running. Rerunning it.").publish()
self._run_all_stress_cmds(stress_queue, stress_params)

def test_tablets_split_merge(self): # pylint: disable=too-many-locals # noqa: PLR0914
"""
(1) writing initial schema and get initial number of tablets.
(2) start write and read stress simultaneously.
(3) wait for tablets split.
(4) wait for stress completion.
(5) run deletions, nodetool flush and major compaction.
(6) wait for tablets merge, following a shrunk dataset size.
(7) redo more such cycles.
"""

# (1) initiate schema.
InfoEvent(message=f"Create a keyspace with tablets initial value").publish()
self._pre_create_large_partitions_schema()
stress_cmd = self.params.get('stress_cmd')
stress_read_cmd = self.params.get('stress_read_cmd')
deletion_percentage = 96 # How many of dataset partitions should be deleted.

# Run prepare stress
self.run_prepare_write_cmd()

# Run Read background stress
stress_read_queue = []
read_params = {'keyspace_num': 1, 'stress_cmd': stress_read_cmd}
self._run_all_stress_cmds(stress_read_queue, read_params)
self.node1 = self.db_cluster.nodes[0]
sstable_utils = SstableUtils(db_node=self.node1, ks_cf=KS_CF)
InfoEvent(message=f"Starting background thread to query and set max tablets number.").publish()
background_thread = threading.Thread(target=self._background_query_tablets_num, daemon=True)
background_thread.start()
# Run cycles of writes and deletions
cycles_num = 7
for cycle in range(cycles_num):
with self.tablets_num_lock:
initial_tablets_num = self.max_tablets_num = self._get_tablets_number()
InfoEvent(
message=f"Cycle {cycle }, Initial tablets number before stress is: {initial_tablets_num}").publish()
InfoEvent(message=f"Starting write load: {stress_cmd}").publish()
stress_queue = []
self.assemble_and_run_all_stress_cmd(stress_queue=stress_queue, stress_cmd=stress_cmd, keyspace_num=1)

self._start_stress_if_not_running(stress_read_queue, read_params)

InfoEvent(message="Wait for write stress to finish (if running)").publish()
for stress in stress_queue:
self.verify_stress_thread(cs_thread_pool=stress)

self._start_stress_if_not_running(stress_read_queue, read_params)
InfoEvent(message=f"Start deletions to trigger a tablets merge.").publish()
self.delete_partitions_in_batch(deletion_percentage)

InfoEvent(message=f"Run a flush for {KEYSPACE_NAME} on nodes").publish()
triggers = [partial(node.run_nodetool, sub_cmd=f"flush -- {KEYSPACE_NAME}", )
for node in self.db_cluster.data_nodes]
ParallelObject(objects=triggers, timeout=2400).call_objects()

InfoEvent(
message=f"Checking sstable sizes of {sstable_utils.ks_cf} before major compaction").publish()
sstable_utils.get_cf_dir_files_and_sizes()
InfoEvent(message=f"Run a major compaction for {KEYSPACE_NAME} on nodes").publish()
triggers = [partial(node.run_nodetool, sub_cmd="compact", args=f"{KEYSPACE_NAME} {TABLE_NAME}", ) for
node in self.db_cluster.data_nodes]
ParallelObject(objects=triggers, timeout=2400).call_objects()

InfoEvent(message=f"Checking sstable sizes of {sstable_utils.ks_cf} after major compaction").publish()
sstable_utils.get_cf_dir_files_and_sizes()
self._wait_for_tablet_split(tablets_num=initial_tablets_num)
self.log.debug(f"Final max tablets number: {self.max_tablets_num}")

if self.max_tablets_num <= initial_tablets_num:
InfoEvent(
message=f"The maximum number of tablets [{self.max_tablets_num}] is not bigger than the initial number [{initial_tablets_num}].", severity=Severity.ERROR).publish()
InfoEvent(message="Wait for tablets merge after deletion is done.").publish()
self._wait_for_tablets_merge()

self.log.debug("Stopping background thread.")
self.query_tablets_num_stop_event.set()
background_thread.join()

InfoEvent(message="Wait for read stress to finish.").publish()

for stress in stress_read_queue:
self.verify_stress_thread(cs_thread_pool=stress)

def _is_stress_finished(self, stress_queue) -> bool:
for index, stress in enumerate(stress_queue):
self.log.debug(f"Checking stress task {index + 1}/{len(stress_queue)}: {stress.stress_cmd}")
if not all(future.done() for future in stress.results_futures):
self.log.debug(f"Stress task {index + 1} is still running.")
return False
self.log.debug(f"Stress task {index + 1} has completed.")
self.log.debug("All stress tasks have finished.")
return True

def _wait_for_tablets_merge(self, timeout_min: int = 15):
"""
Waits for a tablets number smaller than tablets_num
"""
text = f"Waiting for a smaller tablets number"
res = wait.wait_for(func=lambda: self._get_tablets_number() < self.max_tablets_num, step=60,
text=text, timeout=60 * timeout_min, throw_exc=False)
if not res:
InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish()

def _wait_for_tablet_split(self, tablets_num: int, timeout_min: int = 15):
"""
Waits for tablets number bigger than tablets_num
"""
text = f"Waiting for a tablets number bigger than {tablets_num}"
res = wait.wait_for(func=lambda: self.max_tablets_num > tablets_num, step=60,
text=text, timeout=60 * timeout_min, throw_exc=False)
if not res:
InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish()

def delete_partitions(self, deletion_percentage: int):
"""
Deletes a percentage of table's partitions.
Example:
partition_end_range = 10
max_partitions_to_delete = 100
deletion_percentage = 70
==> 70 partitions from 11 up to 81 will be deleted.
"""
start_partition = self.partitions_attrs.partition_end_range + 1
max_partitions_to_delete = self.partitions_attrs.max_partitions_in_test_table - self.partitions_attrs.partition_end_range
num_of_partitions_to_delete = int(max_partitions_to_delete * deletion_percentage / 100)
end_partition = start_partition + num_of_partitions_to_delete

self.log.debug(
f"Preparing to delete {num_of_partitions_to_delete} partitions from {start_partition} to {end_partition}.")
for pkey in range(start_partition, end_partition):
delete_query = f"delete from {KS_CF} where pk = {pkey}"
self.log.debug(f'delete query: {delete_query}')
try:
with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session:
session.execute(SimpleStatement(delete_query, consistency_level=ConsistencyLevel.QUORUM),
timeout=3600)
except Exception as e: # pylint: disable=broad-except # noqa: BLE001
message = f"Failed to execute delete: {e}"
InfoEvent(message=message, severity=Severity.ERROR).publish()

def delete_partitions_in_batch(self, deletion_percentage: int):
"""
Deletes a percentage of table's partitions using batched queries.
"""
start_partition = self.partitions_attrs.partition_end_range + 1
max_partitions_to_delete = self.partitions_attrs.max_partitions_in_test_table - self.partitions_attrs.partition_end_range
num_of_partitions_to_delete = int(max_partitions_to_delete * deletion_percentage / 100)
end_partition = start_partition + num_of_partitions_to_delete
batch_size: int = 1000
self.log.debug(
f"Preparing to delete {num_of_partitions_to_delete} partitions from {start_partition} to {end_partition} in batches of {batch_size}.")
batch = []
try:
with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session:
for pkey in range(start_partition, end_partition):
delete_query = f"DELETE FROM {KS_CF} WHERE pk = {pkey}"
batch.append(delete_query)

# Execute batch when it reaches the batch size
if len(batch) >= batch_size:
batch_query = "BEGIN BATCH\n" + "\n".join(batch) + "\nAPPLY BATCH;"
session.execute(SimpleStatement(
batch_query, consistency_level=ConsistencyLevel.QUORUM), timeout=3600)
batch.clear() # Clear batch after execution

# Execute any remaining queries in the batch
if batch:
batch_query = "BEGIN BATCH\n" + "\n".join(batch) + "\nAPPLY BATCH;"
session.execute(SimpleStatement(
batch_query, consistency_level=ConsistencyLevel.QUORUM), timeout=3600)
except Exception as e: # pylint: disable=broad-except # noqa: BLE001
message = f"Failed to execute batch delete: {e}"
InfoEvent(message=message, severity=Severity.ERROR).publish()
28 changes: 28 additions & 0 deletions test-cases/features/tablets/tablets-split-merge-long-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
test_duration: 3000

prepare_write_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=100 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=80 -connection-count=80 -consistency-level=quorum -rows-per-request=50 -timeout=30s -validate-data"
stress_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=150000 -clustering-row-count=5555 -partition-offset=101 -clustering-row-size=1884 -concurrency=80 -connection-count=80 -consistency-level=quorum -rows-per-request=50 -timeout=30s"
stress_read_cmd: "scylla-bench -workload=sequential -mode=read -replication-factor=3 -partition-count=100 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=40 -connection-count=40 -consistency-level=quorum -rows-per-request=1 -timeout=30s -retry-number=30 -retry-interval=80ms,1s -iterations 0 -duration=30m"

pre_create_keyspace: "CREATE KEYSPACE IF NOT EXISTS scylla_bench WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} and tablets={'initial':64};"

data_validation: |
validate_partitions: true
table_name: "scylla_bench.test"
primary_key_column: "pk"
max_partitions_in_test_table: 150100
partition_range_with_data_validation: 0-100
n_db_nodes: 3
n_loaders: 1
n_monitor_nodes: 1
instance_type_db: 'i3.4xlarge'

append_scylla_yaml:
target_tablet_size_in_bytes: 268435456 # 250MB
enable_tablets: true

user_prefix: 'tablets-split-merge'
use_mgmt: false

post_prepare_cql_cmds: "ALTER TABLE scylla_bench.test with gc_grace_seconds = 0"
Loading

0 comments on commit 6e5dc16

Please sign in to comment.