-
Notifications
You must be signed in to change notification settings - Fork 96
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test(tablets): split and merge on writes and deletions
A test of high load, causing rapid tablet splits. Then have a deletion burst and a major compaction to trigger rapid tablets merges.
- Loading branch information
yarongilor
committed
Oct 10, 2024
1 parent
d3b56f6
commit 3e691c3
Showing
4 changed files
with
244 additions
and
0 deletions.
There are no files selected for viewing
12 changes: 12 additions & 0 deletions
12
jenkins-pipelines/oss/longevity/tablets-split-merge-test.jenkinsfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
#!groovy | ||
|
||
// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43 | ||
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm) | ||
|
||
longevityPipeline( | ||
backend: 'aws', | ||
region: 'eu-west-1', | ||
test_name: 'tablets_split_merge_test.TabletsSplitMergeTest.test_tablets_split_merge', | ||
test_config: 'test-cases/features/tablets/tablets-split-merge-test.yaml' | ||
|
||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
import math | ||
import re | ||
|
||
from cassandra import ConsistencyLevel | ||
from cassandra.query import SimpleStatement | ||
|
||
from longevity_test import LongevityTest | ||
from sdcm import wait | ||
from sdcm.sct_events import Severity | ||
from sdcm.sct_events.system import InfoEvent | ||
from sdcm.utils.tablets.common import get_tablets_count | ||
|
||
KEYSPACE_NAME = 'keyspace1' | ||
TABLE_NAME = 'standard1' | ||
KS_CF = f"{KEYSPACE_NAME}.{TABLE_NAME}" | ||
|
||
|
||
def extract_sequence_range(stress_cmd: str) -> int: | ||
"""Extract the sequence range from the stress command.""" | ||
match = re.search(r'-pop seq=\d+\.\.(\d+)', stress_cmd) | ||
if match: | ||
return int(match.group(1)) | ||
raise ValueError("Sequence range not found in the stress command.") | ||
|
||
|
||
class TabletsSplitMergeTest(LongevityTest): | ||
|
||
def _parse_stress_command(self, stress_cmd: str) -> tuple: | ||
"""Parse the stress command to extract number of partitions, column size, and number of columns.""" | ||
# Extract 'n' value | ||
match_n = re.search(r'n=(\d+)', stress_cmd) | ||
if match_n: | ||
num_partitions = int(match_n.group(1)) | ||
else: | ||
raise ValueError("Number of partitions not found in the stress command.") | ||
|
||
# Extract column size and number of columns | ||
match_col = re.search(r"-col 'size=FIXED\((\d+)\) n=FIXED\((\d+)\)'", stress_cmd) | ||
if match_col: | ||
column_size = int(match_col.group(1)) # Size in bytes | ||
num_columns = int(match_col.group(2)) # Number of columns | ||
else: | ||
raise ValueError("Column size and count not found in the stress command.") | ||
|
||
return num_partitions, column_size, num_columns | ||
|
||
def _get_tablets_number(self) -> int: | ||
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session: | ||
tablets_number = get_tablets_count(keyspace=KEYSPACE_NAME, session=session) | ||
self.log.debug(f"Tablets number for {KEYSPACE_NAME} is: {tablets_number}") | ||
return tablets_number | ||
|
||
def _get_max_expected_tablets(self, stress_cmd: str, target_tablet_size_in_bytes: int) -> int: | ||
# Parse the stress command | ||
num_partitions, column_size, num_columns = self._parse_stress_command(stress_cmd) | ||
dataset_size = num_partitions * column_size * num_columns | ||
max_expected_tablets = math.ceil(dataset_size / target_tablet_size_in_bytes) | ||
self.log.debug(f"Calculated dataset size: {dataset_size / (1024 ** 3)} GB") | ||
self.log.debug(f"Maximum expected tablets number: {max_expected_tablets}") | ||
return max_expected_tablets | ||
|
||
def _update_target_tablet_size(self, target_tablet_size_in_bytes: int): | ||
for node in self.db_cluster.nodes: | ||
self.log.info(f"Updating {node} with new target_tablet_size_in_bytes = {target_tablet_size_in_bytes}") | ||
append_scylla_yaml = {"target_tablet_size_in_bytes": target_tablet_size_in_bytes} | ||
with node.remote_scylla_yaml() as scylla_yaml: | ||
scylla_yaml.update(append_scylla_yaml) | ||
self.log.debug(f"Restarting node {node} to apply new target_tablet_size_in_bytes") | ||
node.restart_scylla_server() | ||
|
||
def test_tablets_split_merge(self): # pylint: disable=too-many-locals # noqa: PLR0914 | ||
""" | ||
(1) writing initial schema and get initial number of tablets. | ||
(2) start write and read stress simultaneously. | ||
(3) wait for tablets split. | ||
(4) wait for stress completion. | ||
(5) run deletions, nodetool flush and major compaction. | ||
(6) wait for tablets merge, following a shrunk dataset size. | ||
(7) redo more such cycles. | ||
""" | ||
|
||
# (1) initiate schema. | ||
InfoEvent(message=f"Create c-s keyspace with tablets initial value").publish() | ||
self.run_pre_create_keyspace() | ||
self.run_pre_create_schema() | ||
self.run_prepare_write_cmd() | ||
stress_cmd = self.params.get('stress_cmd') | ||
sequence_range = extract_sequence_range(stress_cmd) | ||
stress_read_cmd = self.params.get('stress_read_cmd') | ||
append_scylla_yaml = self.params.get('append_scylla_yaml') | ||
target_tablet_size_in_bytes = append_scylla_yaml['target_tablet_size_in_bytes'] | ||
max_expected_tablets = self._get_max_expected_tablets(stress_cmd, target_tablet_size_in_bytes) | ||
tablets_number_split_threshold = max_expected_tablets // 2 | ||
deletion_percentage = 70 # How many of dataset partitions should be deleted. | ||
non_deleted_percentage = 100 - deletion_percentage | ||
# In case, for example, 70% of data is deleted, then 30% left. | ||
# Accordingly, it is expected to have about 30% of the tablets + some small extra (10%) | ||
# example scenario: | ||
# max_expected_tablets = 6 | ||
# deletion_percentage = 70, non_deleted_percentage = 30 | ||
# tablets_number_merge_threshold = math.ceil(6 * ((30+10) / 100) ) = 3 | ||
# Meaning there were 6 tablets, then after 70% of data is deleted, we expect 3 or fewer tablets. | ||
tablets_number_merge_threshold = max_expected_tablets * math.ceil((non_deleted_percentage + 10) / 100) | ||
stress_read_queue = [] | ||
read_params = {'keyspace_num': 1, 'stress_cmd': stress_read_cmd} | ||
self._run_all_stress_cmds(stress_read_queue, read_params) | ||
self.node1 = self.db_cluster.nodes[0] | ||
cycles_num = 2 # TODO: should be 3? more? | ||
for cycle in range(cycles_num): | ||
tablets_num = self._get_tablets_number() | ||
InfoEvent(message=f"Initial tablets number before stress is: {tablets_num}").publish() | ||
InfoEvent(message=f"Starting C-S write load: {stress_cmd}").publish() | ||
stress_queue = [] | ||
self.assemble_and_run_all_stress_cmd(stress_queue=stress_queue, stress_cmd=stress_cmd, keyspace_num=1) | ||
|
||
if self._is_stress_finished(stress_queue=stress_read_queue): | ||
InfoEvent(message="The Read Stress is finished. Rerunning it.").publish() | ||
self._run_all_stress_cmds(stress_read_queue, read_params) | ||
|
||
InfoEvent(message="Wait for write stress to finish (if running)").publish() | ||
for stress in stress_queue: | ||
self.verify_stress_thread(cs_thread_pool=stress) | ||
|
||
InfoEvent(message="Wait for tablets split following writes.").publish() | ||
self._wait_for_more_tablet_than(tablets_number_split_threshold) | ||
|
||
tablets_num = self._get_tablets_number() | ||
InfoEvent(message=f"Start deletions to trigger a tablets merge following {tablets_num} tablets.").publish() | ||
self.delete_partitions(sequence_range, deletion_percentage) | ||
self.node1.run_nodetool(f"flush -- {KEYSPACE_NAME}") | ||
self.node1.run_nodetool("compact", args=f"{KEYSPACE_NAME} {TABLE_NAME}") | ||
|
||
InfoEvent(message="Wait for tablets merge after deletion is done.").publish() | ||
self._wait_for_less_or_equal_tablet_than(tablets_number_merge_threshold) | ||
|
||
InfoEvent(message="Wait for read stress to finish.").publish() | ||
|
||
for stress in stress_read_queue: | ||
self.verify_stress_thread(cs_thread_pool=stress) | ||
|
||
def _is_stress_finished(self, stress_queue) -> bool: | ||
for index, stress in enumerate(stress_queue): | ||
self.log.debug(f"Checking stress task {index + 1}/{len(stress_queue)}: {stress}") | ||
if not all(future.done() for future in stress.results_futures): | ||
self.log.debug(f"Stress task {index + 1} is still running.") | ||
return False | ||
self.log.debug(f"Stress task {index + 1} has completed.") | ||
self.log.debug("All stress tasks have finished.") | ||
return True | ||
|
||
def _wait_for_less_or_equal_tablet_than(self, tablets_num: int, timeout_min: int = 30): | ||
text = f"Waiting for a tablets number smaller than {tablets_num}" | ||
res = wait.wait_for(func=lambda: self._get_tablets_number() <= tablets_num, step=60, | ||
text=text, timeout=60 * timeout_min, throw_exc=False) | ||
if not res: | ||
InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish() | ||
|
||
def _wait_for_more_tablet_than(self, tablets_num: int, timeout_min: int = 30): | ||
text = f"Waiting for a tablets number bigger than {tablets_num}" | ||
res = wait.wait_for(func=lambda: self._get_tablets_number() > tablets_num, step=60, | ||
text=text, timeout=60 * timeout_min, throw_exc=False) | ||
if not res: | ||
InfoEvent(message=f"{text} FAILED.", severity=Severity.ERROR).publish() | ||
|
||
def delete_partitions(self, sequence_range: int, deletion_percentage: int, batch_size: int = 1000): | ||
""" | ||
Deletes a percentage of table's partitions (from the end of it). | ||
Example: | ||
sequence_range = 100 | ||
deletion_percentage = 70 | ||
==> the partitions of 31 up to 100 will be deleted. | ||
==> in case batch_size = 10, the batch deletions would be: 31-40,41-50,...,91-100 | ||
""" | ||
num_of_partitions_to_delete = int(sequence_range * deletion_percentage / 100) | ||
start_partition = sequence_range - num_of_partitions_to_delete | ||
self.log.debug( | ||
f"Preparing to delete {num_of_partitions_to_delete} partitions of {KS_CF}, starting from {start_partition}.") | ||
|
||
with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session: | ||
for start in range(start_partition, sequence_range, batch_size): | ||
end = min(start + batch_size, sequence_range) | ||
batch_deletion_query = ["BEGIN BATCH"] | ||
for pkey in range(start, end): | ||
batch_deletion_query.append(f"DELETE FROM {KS_CF} WHERE pk = {pkey};") | ||
batch_deletion_query.append("APPLY BATCH;") | ||
batch_deletion_query = " ".join(batch_deletion_query) | ||
|
||
self.log.debug(f"Executing batch delete from {start} to {end - 1}") | ||
try: | ||
session.execute(SimpleStatement(batch_deletion_query, consistency_level=ConsistencyLevel.QUORUM), | ||
timeout=3600) | ||
except Exception as e: # pylint: disable=broad-except # noqa: BLE001 | ||
message = f"Failed to execute batch delete: {e}" | ||
InfoEvent(message=message, severity=Severity.ERROR).publish() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
test_duration: 7200 | ||
|
||
prepare_write_cmd: "cassandra-stress write no-warmup cl=ALL n=1500000 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=200 -col 'size=FIXED(200) n=FIXED(5)' -pop seq=1..1500000" | ||
# write 6GB | ||
stress_cmd: "cassandra-stress write no-warmup cl=QUORUM n=4500000 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=200 -col 'size=FIXED(200) n=FIXED(5)' -pop seq=1500000..6000000" | ||
# read 1.5GB | ||
stress_read_cmd: "cassandra-stress read cl=QUORUM duration=30m -mode cql3 native -rate threads=10 -pop seq=1..1500000 -log interval=5 -col 'size=FIXED(200) n=FIXED(5)'" | ||
|
||
n_db_nodes: 3 | ||
n_loaders: 1 | ||
n_monitor_nodes: 1 | ||
instance_type_db: 'i4i.large' | ||
|
||
append_scylla_yaml: | ||
target_tablet_size_in_bytes: 1073741824 # 1GB | ||
enable_tablets: true | ||
|
||
user_prefix: 'tablets-split-merge' | ||
use_mgmt: false | ||
|
||
pre_create_keyspace: "CREATE KEYSPACE keyspace1 WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '3'} and tablets={'initial':1};" | ||
pre_create_schema: true |