Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Included kruize local scalability test using bulk API #1396

Open
wants to merge 7 commits into
base: mvp_demo
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions tests/scripts/helpers/kruize.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,33 +472,65 @@ def generate_recommendations(experiment_name):
print("\n************************************************************")
return response

def post_bulk_api(input_json_file, logger=None):
if logger:
logger.info("\n************************************************************")
else:
print("\n************************************************************")

if logger:
logger.info(f"Sending POST request to URL: {URL}/bulk")
logger.info(f"Request Payload: {input_json_file}")
else:
print("Sending POST request to URL: ", f"{URL}/bulk")
print("Request Payload: ", input_json_file)

def post_bulk_api(input_json_file):
print("\n************************************************************")
print("Sending POST request to URL: ", f"{URL}/bulk")
print("Request Payload: ", input_json_file)
curl_command = f"curl -X POST {URL}/bulk -H 'Content-Type: application/json' -d '{json.dumps(input_json_file)}'"
print("Equivalent cURL command: ", curl_command)
if logger:
logger.info(f"Equivalent cURL command: {curl_command}")
else:
print("Equivalent cURL command: ", curl_command)

# Send the POST request
response = requests.post(f"{URL}/bulk", json=input_json_file)
print("Response Status Code: ", response.status_code)
print("Response JSON: ", response.json())
return response

if logger:
logger.info(f"Response Status Code: {response.status_code}")
logger.info(f"Response JSON: {response.json()}")
else:
print("Response Status Code: ", response.status_code)
print("Response JSON: ", response.json())
return response

def get_bulk_job_status(job_id, verbose=False):
print("\n************************************************************")
def get_bulk_job_status(job_id,verbose=False,logger=None):
if logger:
logger.info("\n************************************************************")
else:
print("\n************************************************************")

url_basic = f"{URL}/bulk?job_id={job_id}"
url_verbose = f"{URL}/bulk?job_id={job_id}&verbose={verbose}"
getJobIDURL = url_basic
if verbose:
getJobIDURL = url_verbose
print("Sending GET request to URL ( verbose=", verbose, " ): ", getJobIDURL)

if logger:
logger.info(f"Sending GET request to URL ( verbose={verbose} ): {getJobIDURL}")
else:
print("Sending GET request to URL ( verbose=",verbose," ): ", getJobIDURL)

curl_command_verbose = f"curl -X GET '{getJobIDURL}'"
print("Equivalent cURL command : ", curl_command_verbose)

if logger:
logger.info(f"Equivalent cURL command : {curl_command_verbose}")
else:
print("Equivalent cURL command : ", curl_command_verbose)
response = requests.get(url_verbose)

print("Verbose GET Response Status Code: ", response.status_code)
print("Verbose GET Response JSON: ", response.json())
if logger:
logger.info(f"Verbose GET Response Status Code: {response.status_code}")
logger.info(f"Verbose GET Response JSON: {response.json()}")
else:
print("Verbose GET Response Status Code: ", response.status_code)
print("Verbose GET Response JSON: ", response.json())
return response
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
"""
Copyright (c) 2024, 2024 Red Hat, IBM Corporation and others.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import requests
import argparse
import json
import subprocess
from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
sys.path.append("../../")
from helpers.kruize import *
from helpers.utils import *
import time
import logging
from datetime import datetime, timedelta

def setup_logger(name, log_file, level=logging.INFO):

handler = logging.FileHandler(log_file)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)

return logger

def invoke_bulk_with_time_range_labels(worker_number, resultsdir, bulk_json, delay):
try:
#time.sleep(delay)
scale_log_dir = resultsdir + "/scale_logs"
os.makedirs(scale_log_dir, exist_ok=True)

org_id = bulk_json['filter']['include']['labels']['org_id']
cluster_id = bulk_json['filter']['include']['labels']['cluster_id']

log_id = str(worker_number) + "-" + org_id + "-" + cluster_id

log_file = f"{scale_log_dir}/worker_{log_id}.log"

logger = setup_logger(f"logger_{log_id}", log_file)
logger.info(f"log id = {log_id}")

logger.info(f"worker number = {worker_number}")

# Invoke the bulk service
logger.info("Invoking bulk service with bulk json")
bulk_response = post_bulk_api(bulk_json, logger)

# Obtain the job id from the response from bulk service
job_id_json = bulk_response.json()

job_id = job_id_json['job_id']
logger.info(f"worker number - {worker_number} job id - {job_id}")

# Get the bulk job status using the job id
verbose = "true"
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()

# Loop until job status is COMPLETED
job_status = job_status_json['status']

while job_status != "COMPLETED":
bulk_job_response = get_bulk_job_status(job_id, verbose, logger)
job_status_json = bulk_job_response.json()
job_status = job_status_json['status']
if job_status == "FAILED":
logger.info("Job FAILED!")
break
sleep(5)

logger.info(f"worker number - {worker_number} job id - {job_id} job status - {job_status}")

# Dump the job status json into a file
job_status_dir = results_dir + "/job_status_jsons"
os.makedirs(job_status_dir, exist_ok=True)

job_file = job_status_dir + "/job_status" + log_id + ".json"
logger.info(f"Storing job status in {job_file}")
with open(job_file, 'w') as f:
json.dump(job_status_json, f, indent=4)

# Fetch the list of experiments for which recommendations are available
if job_status != "FAILED":
logger.info("Fetching processed experiments...")
exp_list = list(job_status_json["experiments"].keys())

logger.info("List of processed experiments")
logger.info("**************************************************")
logger.info(exp_list)
logger.info("**************************************************")

# List recommendations for the experiments for which recommendations are available
recommendations_json_arr = []

if exp_list:
list_reco_failures = 0
for exp_name in exp_list:

logger.info(f"Fetching recommendations for {exp_name}...")
list_reco_response = list_recommendations(exp_name)
if list_reco_response.status_code != 200:
list_reco_failures = list_reco_failures + 1
logger.info(f"List recommendations failed for the experiment - {exp_name}!")
reco = list_reco_response.json()
logger.info(reco)
continue
else:
logger.info(f"Fetched recommendations for {exp_name} - Done")

reco = list_reco_response.json()
recommendations_json_arr.append(reco)

# Dump the recommendations into a json file
reco_dir = results_dir + "/recommendation_jsons"
os.makedirs(reco_dir, exist_ok=True)
reco_file = reco_dir + "/recommendations" + log_id + ".json"
with open(reco_file, 'w') as f:
json.dump(recommendations_json_arr, f, indent=4)

if list_reco_failures != 0:
logger.info(
f"List recommendations failed for some of the experiments, check the log {log_file} for details!")
return -1
else:
return 0
else:
logger.error("Something went wrong! There are no experiments with recommendations!")
return -1
else:
logger.info(f"Check {job_file} for job status")
return -1
except Exception as e:
return {'error': str(e)}

def parallel_requests_with_labels(max_workers, resultsdir, initial_end_time, interval_hours, days_of_res, org_ids, cluster_ids, interval_seconds):
results = []

print(f"initial_end_time - {initial_end_time}")
print(f"days_of_res - {days_of_res}")
print(f"interval_hours - {interval_hours}")

num_tsdb_blocks = int((days_of_res * 24) / interval_hours)

print(f"num_tsdb_blocks - {num_tsdb_blocks}")

current_end_time = initial_end_time
# Update time range in the bulk input json
bulk_json_file = "../json_files/bulk_input_timerange.json"

json_file = open(bulk_json_file, "r")
bulk_input_json = json.loads(json_file.read())

for k in range(1, num_tsdb_blocks + 1):

current_start_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ') - timedelta(
hours=interval_hours)
current_end_time = datetime.strptime(current_end_time, '%Y-%m-%dT%H:%M:%S.%fZ')
print(current_end_time)
print(current_start_time)
current_start_time = current_start_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')
current_end_time = current_end_time.strftime('%Y-%m-%dT%H:%M:%S.000Z')

for org_id in range(1, org_ids + 1):
for cluster_id in range(1, cluster_ids + 1):
org_value = "org-" + str(org_id)
cluster_value = "eu-" + str(org_id) + "-" + str(cluster_id)

new_labels = {
"org_id": org_value,
"cluster_id": cluster_value
}

bulk_json = bulk_input_json
bulk_json['time_range']['start'] = current_start_time
bulk_json['time_range']['end'] = current_end_time
bulk_json['filter']['include']['labels'].update(new_labels)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all the tasks to the executor
futures = []
for worker_number in range(1, max_workers+1):
executor.submit(invoke_bulk_with_time_range_labels, worker_number, resultsdir, bulk_json, delay=worker_number * interval_seconds)

# Process the results as they complete
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
results.append({'error': str(e)})

current_end_time = current_start_time

return results

if __name__ == '__main__':
cluster_type = "openshift"
max_workers = 1
days_of_res = 1
results_dir = "."
initial_end_date = "2024-12-10T11:50:00.001Z"
interval_hours = 6
org_ids = 10
cluster_ids = 10
rampup_interval_seconds = 2

parser = argparse.ArgumentParser()

# add the named arguments
parser.add_argument('--workers', type=str, help='specify the number of workers')
parser.add_argument('--enddate', type=str, help='Specify end date and time of the tsdb block in "%Y-%m-%dT%H:%M:%S.%fZ" format.')
parser.add_argument('--interval', type=str, help='specify the interval hours')
parser.add_argument('--resultsdir', type=str, help='specify the results dir')
parser.add_argument('--org_ids', type=str, help='specify the no. of orgs')
parser.add_argument('--cluster_ids', type=str, help='specify the no. of clusters / org')

# parse the arguments from the command line
args = parser.parse_args()

if args.workers:
max_workers = int(args.workers)

if args.enddate:
initial_end_date = args.enddate

if args.interval:
interval_hours = int(args.interval)

if args.resultsdir:
results_dir = args.resultsdir

if args.org_ids:
org_ids = int(args.org_ids)

if args.cluster_ids:
org_ids = int(args.cluster_ids)

form_kruize_url(cluster_type)

# Create the metric profile
metric_profile_dir = get_metric_profile_dir()
metric_profile_json_file = metric_profile_dir / 'resource_optimization_local_monitoring.json'
print(metric_profile_json_file)
create_metric_profile(metric_profile_json_file)

# List datasources
datasource_name = None
list_response = list_datasources(datasource_name)
list_response_json = list_response.json()

if list_response_json['datasources'][0]['name'] != "thanos":
print("Failed! Thanos datasource is not registered with Kruize!")
sys.exit(1)

start_time = time.time()
print(f"initial_end_date to parallel requests - {initial_end_date}")
responses = parallel_requests_with_labels(max_workers, results_dir, initial_end_date, interval_hours, days_of_res, org_ids, cluster_ids, rampup_interval_seconds)

# Print the results
for i, response in enumerate(responses):
print(f"Response {i+1}: {json.dumps(response, indent=2)}")

end_time = time.time()
exec_time = end_time - start_time
print(f"Execution time: {exec_time} seconds")
Loading
Loading