Skip to content

Commit

Permalink
test(tablets): split and merge on writes and deleted
Browse files Browse the repository at this point in the history
A test of high load, causing rapid tablet splits.
Then have a deletion burst and a major compaction to trigger merges.
  • Loading branch information
yarongilor committed Oct 9, 2024
1 parent d3b56f6 commit d09408f
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
17 changes: 17 additions & 0 deletions sdcm/utils/tablets/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,20 @@ def wait_for_tablets_balanced(node):
params={}, timeout=3600, retry=3)
time.sleep(5)
LOGGER.info("Tablets are balanced")


def get_tablets_count(keyspace: str, session) -> int:
"""
Returns the number of tablets in a keyspace.
:param session:
:param keyspace:
:return: tablets count int
"""
keyspace, table = keyspace.split('.')
cmd = f"select tablet_count,resize_type from system.tablets where keyspace_name='{keyspace}' ALLOW FILTERING"
cql_result = session.execute(cmd)
if not cql_result or not cql_result.current_rows:
return 0
tablet_count = cql_result.current_rows[0].tablet_count
LOGGER.debug('Retrieved a tablets count of: %s for keyspace %s', tablet_count, keyspace)
return int(tablet_count)
120 changes: 120 additions & 0 deletions tablets_split_merge_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import re

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement

from longevity_test import LongevityTest
from sdcm import wait
from sdcm.sct_events import Severity
from sdcm.sct_events.system import InfoEvent
from sdcm.utils.tablets.common import get_tablets_count

KEYSPACE_NAME = 'keyspace1'
TABLE_NAME = 'standard1'
KS_CF = f"{KEYSPACE_NAME}.{TABLE_NAME}"


def extract_sequence_range(stress_cmd: str) -> int:
"""Extract the sequence range from the stress command."""
match = re.search(r'-pop seq=\d+\.\.\.(\d+)', stress_cmd)
if match:
return int(match.group(1))
raise ValueError("Sequence range not found in the stress command.")


class TabletsSplitMergeTest(LongevityTest):

def _get_tablets_number(self) -> int:
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session:
tablets_number = get_tablets_count(keyspace=KEYSPACE_NAME, session=session)
InfoEvent(message=f"Tablets number for {KEYSPACE_NAME} is: {tablets_number}").publish()
return tablets_number

def _update_target_tablet_size(self, target_tablet_size_in_bytes: int):
for node in self.db_cluster.nodes:
self.log.info(f"Updating {node} with new target_tablet_size_in_bytes = {target_tablet_size_in_bytes}")
append_scylla_yaml = {"target_tablet_size_in_bytes": target_tablet_size_in_bytes}
with node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.update(append_scylla_yaml)
self.log.debug(f"Restarting node {node} to apply new target_tablet_size_in_bytes")
node.restart_scylla_server()

def test_tablets_split_merge(self): # pylint: disable=too-many-locals # noqa: PLR0914
"""
(1) writing initial schema and get initial number of tablets.
(2) start write and read stress simultaneously.
(3) wait for tablets split.
(4) wait for stress completion.
(5) run deletions, nodetool flush and major compaction.
(6) wait for tablets merge, following a shrunk dataset size.
(7) redo more such cycles.
"""

# (1) initiate schema.
InfoEvent(message=f"Create c-s keyspace with tablets initial value").publish()
self.run_pre_create_keyspace()
stress_cmd = self.params.get('stress_cmd')
sequence_range = extract_sequence_range(stress_cmd)
stress_read_cmd = self.params.get('stress_read_cmd')
stress_read_queue = []
read_params = {'keyspace_num': 1, 'stress_cmd': stress_read_cmd}
self._run_all_stress_cmds(stress_read_queue, read_params)
self.node1 = self.db_cluster.nodes[0]
cycles_num = 2 # TODO: should be 3? more?
for cycle in range(cycles_num):
tablets_num = self._get_tablets_number()
InfoEvent(message=f"Starting C-S write load: {stress_cmd}").publish()
stress_queue = []
self.assemble_and_run_all_stress_cmd(stress_queue=stress_queue, stress_cmd=stress_cmd, keyspace_num=1)

InfoEvent(message="Wait for tablets split following writes.").publish()
self._wait_for_more_tablet_than(tablets_num)

InfoEvent(message="Wait for write stress to finish (if running)").publish()
for stress in stress_queue:
self.verify_stress_thread(cs_thread_pool=stress)

tablets_num = self._get_tablets_number()
InfoEvent(message="Start deletions to trigger a tablets merge.").publish()
self.delete_partitions(sequence_range)
self.node1.run_nodetool(f"flush -- {KEYSPACE_NAME}")
self.node1.run_nodetool("compact", args=f"{KEYSPACE_NAME} {TABLE_NAME}")

InfoEvent(message="Wait for tablets merge after deletion is done.").publish()
self._wait_for_less_tablet_than(tablets_num)

InfoEvent(message="Wait for read stress to finish.").publish()
for stress in stress_read_queue:
self.verify_stress_thread(cs_thread_pool=stress)

def _wait_for_less_tablet_than(self, tablets_num: int, timeout_min: int = 30):
text = f"Waiting for a tablets number smaller than {tablets_num}"
wait.wait_for(func=lambda: self._get_tablets_number() < tablets_num, step=60,
text=text, timeout=60 * timeout_min, throw_exc=True)

def _wait_for_more_tablet_than(self, tablets_num: int, timeout_min: int = 30):
text = f"Waiting for a tablets number bigger than {tablets_num}"
wait.wait_for(func=lambda: self._get_tablets_number() > tablets_num, step=60,
text=text, timeout=60 * timeout_min, throw_exc=True)

def delete_partitions(self, sequence_range: int, batch_size: int = 1000):
num_of_partitions_to_delete = sequence_range // 2
self.log.debug(f"Preparing to delete {num_of_partitions_to_delete} partitions of {KS_CF}")

with self.db_cluster.cql_connection_patient(self.node1, connect_timeout=300) as session:
for start in range(num_of_partitions_to_delete, sequence_range, batch_size):
end = min(start + batch_size, sequence_range)
batch_deletion_query = ["BEGIN BATCH"]
for pkey in range(start, end):
batch_deletion_query.append(f"DELETE FROM {KS_CF} WHERE pk = {pkey};")
batch_deletion_query.append("APPLY BATCH;")
batch_deletion_query = " ".join(batch_deletion_query)


self.log.debug(f"Executing batch delete from {start} to {end - 1}")
try:
session.execute(SimpleStatement(batch_deletion_query, consistency_level=ConsistencyLevel.QUORUM),
timeout=3600)
except Exception as e:
message=f"Failed to execute batch delete: {e}"
InfoEvent(message=message, severity=Severity.ERROR).publish()
21 changes: 21 additions & 0 deletions test-cases/features/tablets_split_merge_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
test_duration: 7200

#prepare_write_cmd: "cassandra-stress write no-warmup cl=ALL n=100 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=200 -col 'size=FIXED(200) n=FIXED(5)' -pop seq=1..100"
# write 6GB
stress_cmd: "cassandra-stress write no-warmup cl=ALL n=6000000 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=200 -col 'size=FIXED(200) n=FIXED(5)' -pop seq=1..6000000"
# read 6GB
stress_read_cmd: "cassandra-stress read cl=QUORUM n=3000000 -schema 'replication(strategy=NetworkTopologyStrategy,replication_factor=3)' -mode cql3 native -rate threads=200 -pop seq=1..3000000"

n_db_nodes: 3
n_loaders: 1
n_monitor_nodes: 1
instance_type_db: 'i4i.large'

append_scylla_yaml:
target_tablet_size_in_bytes: 1073741824 # 1GB
enable_tablets: true

user_prefix: 'tablets-split-merge'
use_mgmt: false

pre_create_keyspace: "CREATE KEYSPACE keyspace1 WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': '3'} and tablets={'initial':1};"

0 comments on commit d09408f

Please sign in to comment.