diff --git a/tests/scripts/helpers/kruize.py b/tests/scripts/helpers/kruize.py index bcae2bbe5..6e2a073ff 100644 --- a/tests/scripts/helpers/kruize.py +++ b/tests/scripts/helpers/kruize.py @@ -22,6 +22,8 @@ def form_kruize_url(cluster_type, SERVER_IP=None): global URL + KIND_IP="127.0.0.1" + KRUIZE_PORT=8080 if SERVER_IP != None: URL = "http://" + str(SERVER_IP) @@ -38,7 +40,8 @@ def form_kruize_url(cluster_type, SERVER_IP=None): ip = subprocess.run(['minikube ip'], shell=True, stdout=subprocess.PIPE) SERVER_IP = ip.stdout.decode('utf-8').strip('\n') URL = "http://" + str(SERVER_IP) + ":" + str(AUTOTUNE_PORT) - + elif (cluster_type == "kind"): + URL = "http://" + KIND_IP + ":" + str(KRUIZE_PORT) elif (cluster_type == "openshift"): subprocess.run(['oc expose svc/kruize -n openshift-tuning'], shell=True, stdout=subprocess.PIPE) @@ -50,6 +53,33 @@ def form_kruize_url(cluster_type, SERVER_IP=None): URL = "http://" + str(SERVER_IP) print("\nKRUIZE AUTOTUNE URL = ", URL) +# Description: This function invokes the Kruize bulk service API +# Input Parameters: bulk json +def bulk(bulk_json_file): + json_file = open(bulk_json_file, "r") + bulk_json = json.loads(json_file.read()) + + #print("\nInvoking bulk service...") + url = URL + "/bulk" + print("URL = ", url) + + response = requests.post(url, json=bulk_json) + return response + +# Description: This function invokes the Kruize bulk service API +# Input Parameters: job id returned from bulk service +def get_bulk_job_status(job_id, verbose = None): + #print("\nGet the bulk job status for job id %s " % (job_id)) + queryString = "?" + if job_id: + queryString = queryString + "job_id=%s" % (job_id) + if verbose: + queryString = queryString + "&verbose=%s" % (verbose) + + url = URL + "/bulk%s" % (queryString) + #print("URL = ", url) + response = requests.get(url, ) + return response # Description: This function validates the input json and posts the experiment using createExperiment API to Kruize Autotune # Input Parameters: experiment input json @@ -164,10 +194,10 @@ def list_recommendations(experiment_name=None, latest=None, monitoring_end_time= print("PARAMS = ", PARAMS) response = requests.get(url=url, params=PARAMS) - print("Response status code = ", response.status_code) - print("\n************************************************************") - print(response.text) - print("\n************************************************************") + #print("Response status code = ", response.status_code) + #print("\n************************************************************") + #print(response.text) + #print("\n************************************************************") return response diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_input.json b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_input.json new file mode 100644 index 000000000..c20b762c0 --- /dev/null +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_input.json @@ -0,0 +1,22 @@ +{ + "filter": { + "exclude": { + "namespace": [], + "workload": [], + "containers": [], + "labels": {} + }, + "include": { + "namespace": [], + "workload": [], + "containers": [], + "labels": {} + } + }, + "datasource": "thanos", + "time_range": { + "start": "2024-09-01T00:00:00.000Z", + "end": "2024-09-11T00:00:00.000Z" + } +} + diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py new file mode 100644 index 000000000..0a2b745ec --- /dev/null +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.py @@ -0,0 +1,351 @@ +import requests +import argparse +import json +import subprocess +from time import sleep +from concurrent.futures import ThreadPoolExecutor, as_completed +import sys +sys.path.append("../../") +from helpers.kruize import * +from helpers.utils import * +import time +import logging +from datetime import datetime, timedelta + +def setup_logger(name, log_file, level=logging.INFO): + + handler = logging.FileHandler(log_file) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + +def invoke_bulk(worker_number, resultsdir): + try: + + scale_log_dir = resultsdir + "/scale_logs" + os.makedirs(scale_log_dir, exist_ok=True) + + log_file = f"{scale_log_dir}/worker_{worker_number}.log" + logger = setup_logger(f"logger_{worker_number}", log_file) + + bulk_response = bulk("./bulk_input.json") + + # Obtain the job id from the response from bulk service + job_id_json = bulk_response.json() + + job_id = job_id_json['job_id'] + logger.info(f"worker number - {worker_number} job id - {job_id}") + + # Get the bulk job status using the job id + verbose = "true" + bulk_job_response = get_bulk_job_status(job_id, verbose) + job_status_json = bulk_job_response.json() + + # Loop until job status is COMPLETED + job_status = job_status_json['status'] + + while job_status != "COMPLETED": + bulk_job_response = get_bulk_job_status(job_id, verbose) + job_status_json = bulk_job_response.json() + job_status = job_status_json['status'] + if job_status == "FAILED": + logger.info("Job FAILED!") + break + sleep(5) + + logger.info(f"worker number - {worker_number} job id - {job_id} job status - {job_status}") + + # Dump the job status json into a file + job_status_dir = results_dir + "/job_status_jsons" + os.makedirs(job_status_dir, exist_ok=True) + + job_file = job_status_dir + "/job_status" + str(worker_number) + ".json" + logger.info(f"Storing job status in {job_file}") + with open(job_file, 'w') as f: + json.dump(job_status_json, f, indent=4) + + # Fetch the list of experiments for which recommendations are available + logger.info(f"Fetching processed experiments...") + exp_list = list(job_status_json["experiments"].keys()) + + logger.info(f"List of processed experiments") + logger.info(f"**************************************************") + logger.info(exp_list) + logger.info(f"**************************************************") + + # List recommendations for the experiments for which recommendations are available + recommendations_json_arr = [] + + if exp_list != "": + list_reco_failures = 0 + for exp_name in exp_list: + + logger.info(f"Fetching recommendations for {exp_name}...") + list_reco_response = list_recommendations(exp_name) + if list_reco_response.status_code != 200: + list_reco_failures = list_reco_failures + 1 + logger.info(f"List recommendations failed for the experiment - {exp_name}!") + reco = list_reco_response.json() + logger.info(reco) + continue + else: + logger.info(f"Fetched recommendations for {exp_name} - Done") + + reco = list_reco_response.json() + recommendations_json_arr.append(reco) + + # Dump the recommendations into a json file + reco_dir = results_dir + "/recommendation_jsons" + os.makedirs(reco_dir, exist_ok=True) + reco_file = reco_dir + "/recommendations" + str(worker_number) + ".json" + with open(reco_file, 'w') as f: + json.dump(recommendations_json_arr, f, indent=4) + + if list_reco_failures != 0: + logger.info("List recommendations failed for some of the experiments, check the log {log_file} for details!") + return -1 + else: + return 0 + else: + logger.error("Something went wrong! There are no experiments with recommendations!") + return -1 + except Exception as e: + return {'error': str(e)} + + +def invoke_bulk_with_time_range(worker_number, resultsdir, start_time, end_time): + try: + + scale_log_dir = resultsdir + "/scale_logs" + os.makedirs(scale_log_dir, exist_ok=True) + + log_file = f"{scale_log_dir}/worker_{worker_number}.log" + logger = setup_logger(f"logger_{worker_number}", log_file) + + # Update the bulk json with start & end time + logger.info(f"worker number = {worker_number}") + logger.info(f"start time = {start_time}") + logger.info(f"end time = {end_time}") + + # Update time range in the bulk input json + bulk_input_json_file = "./bulk_input.json" + data = json.load(open(bulk_input_json_file)) + data['time_range']['start'] = start_time + data['time_range']['end'] = end_time + + logger.info(data) + + tmp_file = "/tmp/bulk_input_" + str(worker_number) + ".json" + with open(tmp_file, 'w') as f: + json.dump(data, f) + + # Invoke the bulk service + bulk_response = bulk(tmp_file) + + # Obtain the job id from the response from bulk service + job_id_json = bulk_response.json() + + job_id = job_id_json['job_id'] + logger.info(f"worker number - {worker_number} job id - {job_id}") + + # Get the bulk job status using the job id + verbose = "true" + bulk_job_response = get_bulk_job_status(job_id, verbose) + job_status_json = bulk_job_response.json() + + # Loop until job status is COMPLETED + job_status = job_status_json['status'] + print(job_status) + while job_status != "COMPLETED": + bulk_job_response = get_bulk_job_status(job_id, verbose) + job_status_json = bulk_job_response.json() + job_status = job_status_json['status'] + if job_status == "FAILED": + logger.info("Job FAILED!") + break + sleep(5) + + logger.info(f"worker number - {worker_number} job id - {job_id} job status - {job_status}") + + # Dump the job status json into a file + job_status_dir = results_dir + "/job_status_jsons" + os.makedirs(job_status_dir, exist_ok=True) + + job_file = job_status_dir + "/job_status" + str(worker_number) + ".json" + logger.info(f"Storing job status in {job_file}") + with open(job_file, 'w') as f: + json.dump(job_status_json, f, indent=4) + + # Fetch the list of experiments for which recommendations are available + logger.info("Fetching processed experiments...") + if job_status_json['status'] == "COMPLETED": + exp_list = list(job_status_json["experiments"].keys()) + else: + exp_list = "" + logger.info(f"No processed experiments, job status - {jpb_status_json['status']}") + + logger.info("List of processed experiments") + logger.info("**************************************************") + logger.info(exp_list) + logger.info("**************************************************") + + # List recommendations for the experiments for which recommendations are available + recommendations_json_arr = [] + + if exp_list != "": + list_reco_failures = 0 + for exp_name in exp_list: + logger.info(f"Fetching recommendations for {exp_name}...") + list_reco_response = list_recommendations(exp_name) + if list_reco_response.status_code != 200: + list_reco_failures = list_reco_failures + 1 + logger.info(f"List recommendations failed for the experiment - {exp_name}!") + reco = list_reco_response.json() + logger.info(reco) + continue + else: + logger.info(f"Fetched recommendations for {exp_name} - Done") + + reco = list_reco_response.json() + recommendations_json_arr.append(reco) + + reco_dir = results_dir + "/recommendation_jsons" + os.makedirs(reco_dir, exist_ok=True) + reco_file = reco_dir + "/recommendations" + str(worker_number) + ".json" + with open(reco_file, 'w') as f: + json.dump(recommendations_json_arr, f, indent=4) + + if list_reco_failures != 0: + logger.info( + "List recommendations failed for some of the experiments, check the log {log_file} for details!") + return -1 + else: + return 0 + else: + logger.error("Something went wrong! There are no experiments with recommendations!") + return -1 + except Exception as e: + return {'error': str(e)} + +def parallel_requests_to_bulk(workers, resultsdir): + results = [] + with ThreadPoolExecutor(max_workers=workers) as executor: + # Submit all the tasks to the executor + futures = [ + executor.submit(invoke_bulk, worker_number, resultsdir) + for worker_number in range(1, max_workers+1) + ] + + # Process the results as they complete + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + results.append({'error': str(e)}) + + return results + +def parallel_requests_with_time_range(max_workers, resultsdir, initial_end_time, interval_hours, days_of_res): + results = [] + + print(f"days_of_res - {days_of_res}") + print(f"interval_hours - {interval_hours}") + actual_max_workers = int((days_of_res * 24) / interval_hours) + + print(f"max_workers - {max_workers} actual_max_workers - {actual_max_workers}") + if max_workers > actual_max_workers: + print(f"Actual max workers is capped at {actual_max_workers}") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all the tasks to the executor + futures = [] + current_end_time = initial_end_time + for worker_number in range(1, max_workers+1): + current_start_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta(hours=interval_hours) + current_end_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') + print(current_start_time) + current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') + current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z') + executor.submit(invoke_bulk_time_range, worker_number, resultsdir, current_start_time, current_end_time) + current_end_time = current_start_time + + # Process the results as they complete + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + results.append({'error': str(e)}) + + return results + + +if __name__ == '__main__': + cluster_type = "openshift" + max_workers = 5 + days_of_res = 15 + results_dir = "." + initial_end_date = "" + interval_hours = 6 + + parser = argparse.ArgumentParser() + + # add the named arguments + parser.add_argument('--workers', type=str, help='specify the number of workers') + parser.add_argument('--startdate', type=str, help='Specify start date and time in "%Y-%m-%dT%H:%M:%S.%fZ" format.') + parser.add_argument('--interval', type=str, help='specify the interval hours') + parser.add_argument('--resultsdir', type=str, help='specify the results dir') + + # parse the arguments from the command line + args = parser.parse_args() + + if args.workers: + max_workers = int(args.workers) + + if args.startdate: + initial_end_date = args.startdate + + if args.interval: + interval_hours = int(args.interval) + + if args.resultsdir: + results_dir = args.resultsdir + + form_kruize_url(cluster_type) + + # Create the metric profile + metric_profile_dir = get_metric_profile_dir() + metric_profile_json_file = metric_profile_dir / 'resource_optimization_local_monitoring.json' + print(metric_profile_json_file) + create_metric_profile(metric_profile_json_file) + + # List datasources + datasource_name = None + list_response = list_datasources(datasource_name) + + # Import datasource metadata + input_json_file = "./import_metadata.json" + meta_response = import_metadata(input_json_file) + metadata_json = meta_response.json() + print(metadata_json) + if meta_response.status_code != 201: + print("Importing metadata from the datasource failed!") + sys.exit(1) + + start_time = time.time() + #responses = parallel_requests_to_bulk(max_workers, results_dir) + responses = parallel_requests_with_time_range(max_workers, results_dir, initial_end_date, interval_hours, days_of_res) + + # Print the results + for i, response in enumerate(responses): + print(f"Response {i+1}: {json.dumps(response, indent=2)}") + + end_time = time.time() + exec_time = end_time - start_time + print(f"Execution time: {exec_time} seconds") diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh new file mode 100755 index 000000000..583642b86 --- /dev/null +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/bulk_scale_test.sh @@ -0,0 +1,198 @@ +#!/bin/bash +# +# Copyright (c) 2023, 2023 IBM Corporation, RedHat and others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +### Script to run scale test with Kruize in local monitoring mode ## +# + +CURRENT_DIR="$(dirname "$(realpath "$0")")" +KRUIZE_REPO="${CURRENT_DIR}/../../../../" + + +# Source the common functions scripts +. ${CURRENT_DIR}/../../common/common_functions.sh + +RESULTS_DIR=kruize_bulk_scale_test_results +APP_NAME=kruize +CLUSTER_TYPE=openshift + +NAMESPACE=openshift-tuning +num_workers=5 +interval_hours=6 +initial_start_date="2024-11-11T00:00:00.000Z" + +kruize_setup=true +prometheus_ds=0 +replicas=1 + +target="crc" +KRUIZE_IMAGE="quay.io/kruize/autotune:mvp_demo" + +function usage() { + echo + echo "Usage: [-i Kruize image] [-w No. of workers (default - 5)] [-t interval hours (default - 2)] [-s Initial start date (default - 2024-11-11T00:00:00.000Z)] [-r ] [-b kruize setup (default - true)] [ -z to test with prometheus datasource]" + exit -1 +} + +function get_kruize_pod_log() { + log_dir=$1 + + # Fetch the kruize pod log + + echo "" + echo "Fetch the kruize pod logs..." + + pod_list=$(kubectl get pods -n ${NAMESPACE} -l app=kruize --output=jsonpath='{.items[*].metadata.name}') + echo $pod_list + mkdir -p "${log_dir}/pod_logs" + for pod in $pod_list; do + kubectl logs -n ${NAMESPACE} $pod > "${log_dir}/pod_logs/$pod.log" 2>&1 & + done +} + +function get_kruize_service_log() { + log=$1 + + # Fetch the kruize service log + + echo "" + echo "Fetch the kruize service logs and store in ${log}..." + kruize_pod="svc/kruize" + kubectl logs -f ${kruize_pod} -n ${NAMESPACE} > ${log} 2>&1 & +} + +# +# "local" flag is turned off by default for now. This needs to be set to true. +# +function kruize_local_thanos_patch() { + CRC_DIR="./manifests/crc/default-db-included-installation" + KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT="${CRC_DIR}/openshift/kruize-crc-openshift.yaml" + KRUIZE_CRC_DEPLOY_MANIFEST_MINIKUBE="${CRC_DIR}/minikube/kruize-crc-minikube.yaml" + + sed -i 's/"name": "prometheus-1"/"name": "thanos"/' ${KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT} + + #sed -i 's/"serviceName": ""/"serviceName": "thanos-query-frontend"/' ${KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT} + #sed -i 's/"namespace": ""/"namespace": "thanos-bench"/' ${KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT} + + sed -i 's/"serviceName": "prometheus-k8s"/"serviceName": ""/' ${KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT} + sed -i 's/"namespace": "openshift-monitoring"/"namespace": ""/' ${KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT} + sed -i 's#"url": ""#"url": "http://thanos-query-frontend.thanos-bench.svc.cluster.local:9090/"#' ${KRUIZE_CRC_DEPLOY_MANIFEST_OPENSHIFT} +} + + +while getopts r:i:w:s:t:b:a:zh gopts +do + case ${gopts} in + r) + RESULTS_DIR="${OPTARG}" + ;; + i) + KRUIZE_IMAGE="${OPTARG}" + ;; + w) + num_workers="${OPTARG}" + ;; + s) + initial_start_date="${OPTARG}" + ;; + t) + interval_hours="${OPTARG}" + ;; + b) + kruize_setup="${OPTARG}" + ;; + a) + replicas="${OPTARG}" + ;; + z) + prometheus_ds=1 + ;; + h) + usage + ;; + esac +done + +start_time=$(get_date) +LOG_DIR="${RESULTS_DIR}/bulk-scale-test-$(date +%Y%m%d%H%M)" +mkdir -p ${LOG_DIR} + +LOG="${LOG_DIR}/bulk-scale-test.log" + +prometheus_pod_running=$(kubectl get pods --all-namespaces | grep "prometheus-k8s-0") +if [ "${prometheus_pod_running}" == "" ]; then + echo "Install prometheus required to fetch the resource usage metrics for kruize" + exit 1 + +fi + +KRUIZE_SETUP_LOG="${LOG_DIR}/kruize_setup.log" +KRUIZE_SERVICE_LOG="${LOG_DIR}/kruize_service.log" + +# Setup kruize +if [ ${kruize_setup} == true ]; then + echo "Setting up kruize..." | tee -a ${LOG} + echo "$KRUIZE_REPO" + pushd ${KRUIZE_REPO} > /dev/null + # Update datasource + if [ ${prometheus_ds} == 0 ]; then + kruize_local_thanos_patch + fi + + echo "./deploy.sh -c ${CLUSTER_TYPE} -i ${KRUIZE_IMAGE} -m ${target} -t >> ${KRUIZE_SETUP_LOG}" | tee -a ${LOG} + ./deploy.sh -c ${CLUSTER_TYPE} -i ${KRUIZE_IMAGE} -m ${target} -t >> ${KRUIZE_SETUP_LOG} 2>&1 + + sleep 30 + echo "./deploy.sh -c ${CLUSTER_TYPE} -i ${KRUIZE_IMAGE} -m ${target} >> ${KRUIZE_SETUP_LOG}" | tee -a ${LOG} + ./deploy.sh -c ${CLUSTER_TYPE} -i ${KRUIZE_IMAGE} -m ${target} >> ${KRUIZE_SETUP_LOG} 2>&1 & + sleep 60 + + # scale kruize pods + echo "Scaling kruize replicas to ${replicas}..." | tee -a ${LOG} + echo "kubectl scale deployments/kruize -n ${NAMESPACE} --replicas=${replicas}" | tee -a ${LOG} + kubectl scale deployments/kruize -n ${NAMESPACE} --replicas=${replicas} | tee -a ${LOG} + sleep 30 + + echo "List the pods..." | tee -a ${LOG} | tee -a ${LOG} + kubectl get pods -n ${NAMESPACE} | tee -a ${LOG} + + + popd > /dev/null + echo "Setting up kruize...Done" | tee -a ${LOG} +fi + +if [ -z "${SERVER_IP_ADDR}" ]; then + oc expose svc/kruize -n ${NAMESPACE} + + SERVER_IP_ADDR=($(oc status --namespace=${NAMESPACE} | grep "kruize" | grep port | cut -d " " -f1 | cut -d "/" -f3)) + port=0 + echo "SERVER_IP_ADDR = ${SERVER_IP_ADDR} " | tee -a ${LOG} +fi + +echo | tee -a ${LOG} + +get_kruize_pod_log ${LOG_DIR} +get_kruize_service_log ${KRUIZE_SERVICE_LOG} + +# Run the scale test +echo "" +echo "Running scale test for kruize on ${CLUSTER_TYPE}" | tee -a ${LOG} +echo "" +python3 bulk_scale_test.py --workers ${num_workers} --startdate ${initial_start_date} --interval ${interval_hours} --resultsdir ${LOG_DIR} | tee -a ${LOG} + +end_time=$(get_date) +elapsed_time=$(time_diff "${start_time}" "${end_time}") +echo "" +echo "Test took ${elapsed_time} seconds to complete" | tee -a ${LOG} diff --git a/tests/scripts/local_monitoring_tests/bulk_scale_test/import_metadata.json b/tests/scripts/local_monitoring_tests/bulk_scale_test/import_metadata.json new file mode 100644 index 000000000..62975165e --- /dev/null +++ b/tests/scripts/local_monitoring_tests/bulk_scale_test/import_metadata.json @@ -0,0 +1,5 @@ +{ + "version": "v1.0", + "datasource_name": "thanos" +} +