Skip to content

Commit

Permalink
test(tablets): split and merge on writes and deletions
Browse files Browse the repository at this point in the history
A test of high load, causing rapid tablet splits.
Then have a deletion burst and a major compaction to trigger rapid tablets merges.
  • Loading branch information
yarongilor committed Oct 10, 2024
1 parent d3b56f6 commit 8c6a9ac
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!groovy

// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)

longevityPipeline(
backend: 'aws',
region: 'eu-west-1',
test_name: 'tablets_split_merge_test.TabletsSplitMergeTest.test_tablets_split_merge',
test_config: 'test-cases/features/tablets/tablets-split-merge-test.yaml'

)
16 changes: 16 additions & 0 deletions sdcm/utils/tablets/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,19 @@ def wait_for_tablets_balanced(node):
params={}, timeout=3600, retry=3)
time.sleep(5)
LOGGER.info("Tablets are balanced")


def get_tablets_count(keyspace: str, session) -> int:
"""
Returns the number of tablets in a keyspace.
:param session:
:param keyspace:
:return: tablets count int
"""
cmd = f"select tablet_count,resize_type from system.tablets where keyspace_name='{keyspace}' ALLOW FILTERING"
cql_result = session.execute(cmd)
if not cql_result or not cql_result.current_rows:
return 0
tablet_count = cql_result.current_rows[0].tablet_count
LOGGER.debug('Retrieved a tablets count of: %s for keyspace %s', tablet_count, keyspace)
return int(tablet_count)
224 changes: 224 additions & 0 deletions tablets_split_merge_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import math
import re

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement

from longevity_test import LongevityTest
from sdcm import wait
from sdcm.sct_events import Severity
from sdcm.sct_events.system import InfoEvent
from sdcm.utils.tablets.common import get_tablets_count
from test_lib.scylla_bench_tools import create_scylla_bench_table_query

KEYSPACE_NAME = 'scylla_bench'
TABLE_NAME = 'test'
KS_CF = f"{KEYSPACE_NAME}.{TABLE_NAME}"


def extract_sequence_range(stress_cmd):
# Try to extract partition-count from stress_cmd string
match = re.search(r"-partition-count=(\d+)", stress_cmd)

if match:
return int(match.group(1))
else:
# Raise a ValueError if partition-count is not found
raise ValueError("partition-count not found in stress_cmd")


class TabletsSplitMergeTest(LongevityTest):

def _parse_stress_command(self, stress_cmd: str) -> tuple:
"""Parse the stress command to extract number of partitions, column size, and number of columns."""
# Extract 'n' value
match_n = re.search(r'n=(\d+)', stress_cmd)
if match_n:
num_partitions = int(match_n.group(1))
else:
raise ValueError("Number of partitions not found in the stress command.")

# Extract column size and number of columns
match_col = re.search(r"-col 'size=FIXED\((\d+)\) n=FIXED\((\d+)\)'", stress_cmd)
if match_col:
column_size = int(match_col.group(1)) # Size in bytes
num_columns = int(match_col.group(2)) # Number of columns
else:
raise ValueError("Column size and count not found in the stress command.")

return num_partitions, column_size, num_columns

def _get_tablets_number(self) -> int:
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session:
tablets_number = get_tablets_count(keyspace=KEYSPACE_NAME, session=session)
self.log.debug(f"Tablets number for {KEYSPACE_NAME} is: {tablets_number}")
return tablets_number

def _get_max_expected_tablets(self, stress_cmd, target_tablet_size_in_bytes) -> int:
"""
Example calculation:
partition_count = 500
clustering_row_count = 5555
clustering_row_size = 1884 bytes
target_tablet_size_in_bytes = 1073741824 bytes
Total Data Size=500×5555×1884=5,236,230,000 bytes≈5.2GB
Total Data Size=500×5555×1884=5,236,230,000bytes≈5.2GB
Now, dividing by the target tablet size of 1 GB:
Max Expected Tablets=5,236,230,000 / 1,073,741,824 ≈ 4.88 tablets
Thus, the system should expect 4 or 8 tablets based on this setup.
"""
# Extract relevant parameters from stress_cmd
partition_count = int(re.search(r"-partition-count=(\d+)", stress_cmd).group(1))
clustering_row_count = int(re.search(r"-clustering-row-count=(\d+)", stress_cmd).group(1))
clustering_row_size = int(re.search(r"-clustering-row-size=(\d+)", stress_cmd).group(1))

# Calculate total data size (partition_count * clustering_row_count * clustering_row_size)
total_data_size = partition_count * clustering_row_count * clustering_row_size

# Calculate the maximum expected tablets by dividing total size by target tablet size
max_expected_tablets = total_data_size // target_tablet_size_in_bytes
self.log.debug(f"Calculated dataset size: {total_data_size / (1024 ** 3)} GB")
self.log.debug(f"Maximum expected tablets number: {max_expected_tablets}")
return max_expected_tablets

def _update_target_tablet_size(self, target_tablet_size_in_bytes: int):
for node in self.db_cluster.nodes:
self.log.info(f"Updating {node} with new target_tablet_size_in_bytes = {target_tablet_size_in_bytes}")
append_scylla_yaml = {"target_tablet_size_in_bytes": target_tablet_size_in_bytes}
with node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.update(append_scylla_yaml)
self.log.debug(f"Restarting node {node} to apply new target_tablet_size_in_bytes")
node.restart_scylla_server()

def _pre_create_large_partitions_schema(self):
self.run_pre_create_keyspace()
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session:
session.execute(create_scylla_bench_table_query())

def test_tablets_split_merge(self): # pylint: disable=too-many-locals # noqa: PLR0914
"""
(1) writing initial schema and get initial number of tablets.
(2) start write and read stress simultaneously.
(3) wait for tablets split.
(4) wait for stress completion.
(5) run deletions, nodetool flush and major compaction.
(6) wait for tablets merge, following a shrunk dataset size.
(7) redo more such cycles.
"""

# (1) initiate schema.
InfoEvent(message=f"Create c-s keyspace with tablets initial value").publish()
self._pre_create_large_partitions_schema()
stress_cmd = self.params.get('stress_cmd')
stress_read_cmd = self.params.get('stress_read_cmd')
append_scylla_yaml = self.params.get('append_scylla_yaml')
target_tablet_size_in_bytes = append_scylla_yaml['target_tablet_size_in_bytes']
max_expected_tablets = self._get_max_expected_tablets(stress_cmd, target_tablet_size_in_bytes)
tablets_number_split_threshold = max_expected_tablets // 2
deletion_percentage = 70 # How many of dataset partitions should be deleted.
non_deleted_percentage = 100 - deletion_percentage
# In case, for example, 70% of data is deleted, then 30% left.
# Accordingly, it is expected to have about 30% of the tablets + some small extra (10%)
# example scenario:
# max_expected_tablets = 6
# deletion_percentage = 70, non_deleted_percentage = 30
# tablets_number_merge_threshold = math.ceil(6 * ((30+10) / 100) ) = 3
# Meaning there were 6 tablets, then after 70% of data is deleted, we expect 3 or fewer tablets.
tablets_number_merge_threshold = max_expected_tablets * math.ceil((non_deleted_percentage + 10) / 100)

# Run prepare stress
self.run_prepare_write_cmd()

# Run Read background stress
stress_read_queue = []
read_params = {'keyspace_num': 1, 'stress_cmd': stress_read_cmd}
self._run_all_stress_cmds(stress_read_queue, read_params)
self.node1 = self.db_cluster.nodes[0]

# Run cycles of writes and deletions
cycles_num = 3
for cycle in range(cycles_num):
tablets_num = self._get_tablets_number()
InfoEvent(message=f"Initial tablets number before stress is: {tablets_num}").publish()
InfoEvent(message=f"Starting write load: {stress_cmd}").publish()
stress_queue = []
self.assemble_and_run_all_stress_cmd(stress_queue=stress_queue, stress_cmd=stress_cmd, keyspace_num=1)

if self._is_stress_finished(stress_queue=stress_read_queue):
InfoEvent(message="The Read Stress is finished. Rerunning it.").publish()
self._run_all_stress_cmds(stress_read_queue, read_params)

InfoEvent(message="Wait for write stress to finish (if running)").publish()
for stress in stress_queue:
self.verify_stress_thread(cs_thread_pool=stress)

InfoEvent(message="Wait for tablets split following writes.").publish()
self._wait_for_more_tablet_than(tablets_number_split_threshold)

tablets_num = self._get_tablets_number()
InfoEvent(message=f"Start deletions to trigger a tablets merge following {tablets_num} tablets.").publish()
self.delete_partitions(deletion_percentage)
self.node1.run_nodetool(f"flush -- {KEYSPACE_NAME}")
self.node1.run_nodetool("compact", args=f"{KEYSPACE_NAME} {TABLE_NAME}")

InfoEvent(message="Wait for tablets merge after deletion is done.").publish()
self._wait_for_less_or_equal_tablet_than(tablets_number_merge_threshold)

InfoEvent(message="Wait for read stress to finish.").publish()

for stress in stress_read_queue:
self.verify_stress_thread(cs_thread_pool=stress)

def _is_stress_finished(self, stress_queue) -> bool:
for index, stress in enumerate(stress_queue):
self.log.debug(f"Checking stress task {index + 1}/{len(stress_queue)}: {stress}")
if not all(future.done() for future in stress.results_futures):
self.log.debug(f"Stress task {index + 1} is still running.")
return False
self.log.debug(f"Stress task {index + 1} has completed.")
self.log.debug("All stress tasks have finished.")
return True

def _wait_for_less_or_equal_tablet_than(self, tablets_num: int, timeout_min: int = 30): # TODO: 30
text = f"Waiting for a tablets number smaller than {tablets_num}"
res = wait.wait_for(func=lambda: self._get_tablets_number() <= tablets_num, step=60,
text=text, timeout=60 * timeout_min, throw_exc=False)
if not res:
InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish()

def _wait_for_more_tablet_than(self, tablets_num: int, timeout_min: int = 30): # TODO: 30
text = f"Waiting for a tablets number bigger than {tablets_num}"
res = wait.wait_for(func=lambda: self._get_tablets_number() > tablets_num, step=60,
text=text, timeout=60 * timeout_min, throw_exc=False)
if not res:
InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish()

def delete_partitions(self, deletion_percentage: int):
"""
Deletes a percentage of table's partitions (from the end of it).
Example:
partition_end_range = 10
max_partitions_to_delete = 100
deletion_percentage = 70
==> 70 partitions from 11 up to 81 will be deleted.
"""
start_partition = self.partitions_attrs.partition_end_range + 1
max_partitions_to_delete = self.partitions_attrs.max_partitions_in_test_table - self.partitions_attrs.partition_end_range
num_of_partitions_to_delete = int(max_partitions_to_delete * deletion_percentage / 100)
end_partition = start_partition + num_of_partitions_to_delete

self.log.debug(
f"Preparing to delete {num_of_partitions_to_delete} partitions from {start_partition} to {end_partition}.")
for pkey in range(start_partition, end_partition):
delete_query = f"delete from {KS_CF} where pk = {pkey}"
self.log.debug(f'delete query: {delete_query}')
try:
with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session:
session.execute(SimpleStatement(delete_query, consistency_level=ConsistencyLevel.QUORUM),
timeout=3600)
except Exception as e: # pylint: disable=broad-except # noqa: BLE001
message = f"Failed to execute delete: {e}"
InfoEvent(message=message, severity=Severity.ERROR).publish()
26 changes: 26 additions & 0 deletions test-cases/features/tablets/tablets-split-merge-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
test_duration: 240

prepare_write_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=50 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=10 -connection-count=10 -consistency-level=quorum -rows-per-request=10 -timeout=30s -validate-data"
stress_cmd: "scylla-bench -workload=sequential -mode=write -replication-factor=3 -partition-count=500 -clustering-row-count=5555 -partition-offset=51 -clustering-row-size=1884 -concurrency=20 -connection-count=20 -consistency-level=quorum -rows-per-request=10 -timeout=30s -validate-data"
stress_read_cmd: "scylla-bench -workload=sequential -mode=read -replication-factor=3 -partition-count=50 -clustering-row-count=5555 -clustering-row-size=1884 -concurrency=10 -connection-count=10 -consistency-level=quorum -rows-per-request=10 -timeout=30s -retry-number=30 -retry-interval=80ms,1s -iterations 0 -duration=30m -validate-data"

pre_create_keyspace: "CREATE KEYSPACE IF NOT EXISTS scylla_bench WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} and tablets={'initial':1};"

data_validation: |
validate_partitions: true
table_name: "scylla_bench.test"
primary_key_column: "pk"
max_partitions_in_test_table: 550
partition_range_with_data_validation: 0-50
n_db_nodes: 3
n_loaders: 1
n_monitor_nodes: 1
instance_type_db: 'i4i.large'

append_scylla_yaml:
target_tablet_size_in_bytes: 1073741824 # 1GB
enable_tablets: true

user_prefix: 'tablets-split-merge'
use_mgmt: false

0 comments on commit 8c6a9ac

Please sign in to comment.