Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

listExperiments fix for concurrent rm and lm #1421

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion migrations/kruize_local_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ create table IF NOT EXISTS kruize_dsmetadata (id serial, version varchar(255), d
alter table kruize_lm_experiments add column metadata_id bigint references kruize_dsmetadata(id);
alter table if exists kruize_lm_experiments add constraint UK_lm_experiment_name unique (experiment_name);
create table IF NOT EXISTS kruize_metric_profiles (api_version varchar(255), kind varchar(255), metadata jsonb, name varchar(255) not null, k8s_type varchar(255), profile_version float(53) not null, slo jsonb, primary key (name));
alter table kruize_recommendations add column experiment_type varchar(255);
create table IF NOT EXISTS kruize_lm_recommendations (interval_end_time timestamp(6) not null, experiment_name varchar(255) not null, cluster_name varchar(255), extended_data jsonb, version varchar(255),experiment_type varchar(255), primary key (experiment_name, interval_end_time)) PARTITION BY RANGE (interval_end_time);
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ private KruizeObject createKruizeObject(String target_cluster) {
KruizeObject kruizeObject = new KruizeObject();
try {

if (KruizeDeploymentInfo.is_ros_enabled){
if(null == target_cluster || target_cluster.equalsIgnoreCase(AnalyzerConstants.REMOTE)){
if (KruizeDeploymentInfo.is_ros_enabled) {
if (null == target_cluster || target_cluster.equalsIgnoreCase(AnalyzerConstants.REMOTE)) {
new ExperimentDBService().loadExperimentFromDBByName(mainKruizeExperimentMAP, experimentName);
}else{
} else {
new ExperimentDBService().loadLMExperimentFromDBByName(mainKruizeExperimentMAP, experimentName);
}
}else{
} else {
new ExperimentDBService().loadLMExperimentFromDBByName(mainKruizeExperimentMAP, experimentName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
// validate and create KruizeObject if successful
String validationMessage = recommendationEngine.validate_local();
if (validationMessage.isEmpty()) {
KruizeObject kruizeObject = recommendationEngine.prepareRecommendations(calCount, null);
KruizeObject kruizeObject = recommendationEngine.prepareRecommendations(calCount, AnalyzerConstants.LOCAL); // todo target cluster is set to LOCAL always
if (kruizeObject.getValidation_data().isSuccess()) {
LOGGER.debug("UpdateRecommendations API request count: {} success", calCount);
interval_end_time = Utils.DateUtils.getTimeStampFrom(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT,
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/com/autotune/analyzer/services/ListExperiments.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,27 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
String latest = request.getParameter(LATEST);
String recommendations = request.getParameter(KruizeConstants.JSONKeys.RECOMMENDATIONS);
String experimentName = request.getParameter(EXPERIMENT_NAME);
String rm = request.getParameter(AnalyzerConstants.ServiceConstants.RM);
String requestBody = request.getReader().lines().collect(Collectors.joining(System.lineSeparator()));
StringBuilder clusterName = new StringBuilder();
List<KubernetesAPIObject> kubernetesAPIObjectList = new ArrayList<>();
boolean isJSONValid = true;
Map<String, KruizeObject> mKruizeExperimentMap = new ConcurrentHashMap<>();
boolean error = false;
boolean rmTable = false;
// validate Query params
Set<String> invalidParams = new HashSet<>();
for (String param : request.getParameterMap().keySet()) {
if (!KruizeSupportedTypes.QUERY_PARAMS_SUPPORTED.contains(param)) {
invalidParams.add(param);
}
}
if (null != rm
&& !rm.isEmpty()
&& rm.equalsIgnoreCase(AnalyzerConstants.BooleanString.TRUE)
) {
rmTable = true;
}
try {
if (invalidParams.isEmpty()) {
// Set default values if absent
Expand All @@ -142,13 +150,21 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
// parse the requestBody JSON into corresponding classes
parseInputJSON(requestBody, clusterName, kubernetesAPIObjectList);
try {
new ExperimentDBService().loadExperimentFromDBByInputJSON(mKruizeExperimentMap, clusterName, kubernetesAPIObjectList);
if (rmTable)
new ExperimentDBService().loadExperimentFromDBByInputJSON(mKruizeExperimentMap, clusterName, kubernetesAPIObjectList);
else {
new ExperimentDBService().loadLMExperimentFromDBByInputJSON(mKruizeExperimentMap, clusterName, kubernetesAPIObjectList);
}
} catch (Exception e) {
LOGGER.error("Failed to load saved experiment data: {} ", e.getMessage());
}
} else {
// Fetch experiments data from the DB and check if the requested experiment exists
loadExperimentsFromDatabase(mKruizeExperimentMap, experimentName);
if (rmTable) {
loadExperimentsFromDatabase(mKruizeExperimentMap, experimentName);
} else {
loadLMExperimentsFromDatabase(mKruizeExperimentMap, experimentName);
}
}
// Check if experiment exists
if (experimentName != null && !mKruizeExperimentMap.containsKey(experimentName)) {
Expand All @@ -161,18 +177,18 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
);
}
if (!error) {
// create Gson Object
Gson gsonObj = createGsonObject();
// create Gson Object
Gson gsonObj = createGsonObject();

// Modify the JSON response here based on query params.
gsonStr = buildResponseBasedOnQuery(mKruizeExperimentMap, gsonObj, results, recommendations, latest, experimentName);
if (gsonStr.isEmpty()) {
gsonStr = generateDefaultResponse();
}
response.getWriter().println(gsonStr);
response.getWriter().close();
statusValue = "success";
// Modify the JSON response here based on query params.
gsonStr = buildResponseBasedOnQuery(mKruizeExperimentMap, gsonObj, results, recommendations, latest, experimentName);
if (gsonStr.isEmpty()) {
gsonStr = generateDefaultResponse();
}
response.getWriter().println(gsonStr);
response.getWriter().close();
statusValue = "success";
}
} catch (Exception e) {
LOGGER.error("Exception: " + e.getMessage());
e.printStackTrace();
Expand Down Expand Up @@ -278,6 +294,18 @@ private void loadExperimentsFromDatabase(Map<String, KruizeObject> mKruizeExperi
}
}

private void loadLMExperimentsFromDatabase(Map<String, KruizeObject> mKruizeExperimentMap, String experimentName) {
try {
if (experimentName == null || experimentName.isEmpty())
new ExperimentDBService().loadAllLMExperiments(mKruizeExperimentMap);
else
new ExperimentDBService().loadLMExperimentFromDBByName(mKruizeExperimentMap, experimentName);

} catch (Exception e) {
LOGGER.error("Failed to load saved experiment data: {} ", e.getMessage());
}
}

private Gson createGsonObject() {
return new GsonBuilder()
.disableHtmlEscaping()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,38 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
String experimentName = request.getParameter(AnalyzerConstants.ServiceConstants.EXPERIMENT_NAME);
String latestRecommendation = request.getParameter(AnalyzerConstants.ServiceConstants.LATEST);
String monitoringEndTime = request.getParameter(KruizeConstants.JSONKeys.MONITORING_END_TIME);
String rm = request.getParameter(AnalyzerConstants.ServiceConstants.RM);
Timestamp monitoringEndTimestamp = null;
Map<String, KruizeObject> mKruizeExperimentMap = new ConcurrentHashMap<String, KruizeObject>();
;

boolean getLatest = true;
boolean checkForTimestamp = false;
boolean error = false;
boolean rmTable = false;
if (null != latestRecommendation
&& !latestRecommendation.isEmpty()
&& latestRecommendation.equalsIgnoreCase(AnalyzerConstants.BooleanString.FALSE)
) {
getLatest = false;
}
if (null != rm
&& !rm.isEmpty()
&& rm.equalsIgnoreCase(AnalyzerConstants.BooleanString.TRUE)
) {
rmTable = true;
}
List<KruizeObject> kruizeObjectList = new ArrayList<>();
try {
// Check if experiment name is passed
if (null != experimentName) {
// trim the experiment name to remove whitespaces
experimentName = experimentName.trim();
try {
new ExperimentDBService().loadExperimentAndRecommendationsFromDBByName(mKruizeExperimentMap, experimentName);
if (rmTable) {
new ExperimentDBService().loadExperimentAndRecommendationsFromDBByName(mKruizeExperimentMap, experimentName);
} else {
new ExperimentDBService().loadLMExperimentAndRecommendationsFromDBByName(mKruizeExperimentMap, experimentName);
}
} catch (Exception e) {
LOGGER.error("Loading saved experiment {} failed: {} ", experimentName, e.getMessage());
}
Expand Down Expand Up @@ -151,7 +162,11 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t
}
} else {
try {
new ExperimentDBService().loadAllExperimentsAndRecommendations(mKruizeExperimentMap);
if (rmTable) {
new ExperimentDBService().loadAllExperimentsAndRecommendations(mKruizeExperimentMap);
} else {
new ExperimentDBService().loadAllLMExperimentsAndRecommendations(mKruizeExperimentMap);
}
} catch (Exception e) {
LOGGER.error("Loading saved experiment {} failed: {} ", experimentName, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ public static final class ServiceConstants {
public static final String CLUSTER_NAME = "cluster_name";
public static final String VERBOSE = "verbose";
public static final String FALSE = "false";
public static final String RM = "rm";

private ServiceConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*;
import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.NotificationConstants.*;


/**
* The `run` method processes bulk input to create experiments and generates resource optimization recommendations.
* It handles the creation of experiment names based on various data source components, makes HTTP POST requests
Expand Down Expand Up @@ -121,7 +122,7 @@ private static Map<String, String> parseLabelString(String labelString) {
public void run() {
String statusValue = "failure";
MetricsConfig.activeJobs.incrementAndGet();
Timer.Sample timerRunJob = Timer.start(MetricsConfig.meterRegistry());
io.micrometer.core.instrument.Timer.Sample timerRunJob = Timer.start(MetricsConfig.meterRegistry());
DataSourceMetadataInfo metadataInfo = null;
DataSourceManager dataSourceManager = new DataSourceManager();
DataSourceInfo datasource = null;
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/autotune/database/dao/ExperimentDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.autotune.common.data.ValidationOutputData;
import com.autotune.database.table.*;
import com.autotune.database.table.lm.KruizeLMExperimentEntry;
import com.autotune.database.table.lm.KruizeLMRecommendationEntry;

import java.sql.Timestamp;
import java.util.List;
Expand All @@ -25,6 +26,10 @@ public interface ExperimentDAO {
// Add recommendation to DB
public ValidationOutputData addRecommendationToDB(KruizeRecommendationEntry recommendationEntry);

// Add recommendation to DB
public ValidationOutputData addRecommendationToDB(KruizeLMRecommendationEntry recommendationEntry);


// Add Performance Profile to DB
public ValidationOutputData addPerformanceProfileToDB(KruizePerformanceProfileEntry kruizePerformanceProfileEntry);

Expand All @@ -43,7 +48,7 @@ public interface ExperimentDAO {
// If Kruize object restarts load all experiment which are in inprogress
public List<KruizeExperimentEntry> loadAllExperiments() throws Exception;

// If Kruize object restarts load all local monitoring experiments which are in inprogress

public List<KruizeLMExperimentEntry> loadAllLMExperiments() throws Exception;

// If Kruize object restarts load all results from the experiments which are in inprogress
Expand All @@ -52,6 +57,8 @@ public interface ExperimentDAO {
// If Kruize restarts load all recommendations
List<KruizeRecommendationEntry> loadAllRecommendations() throws Exception;

List<KruizeLMRecommendationEntry> loadAllLMRecommendations() throws Exception;

// If Kruize restarts load all performance profiles
List<KruizePerformanceProfileEntry> loadAllPerformanceProfiles() throws Exception;

Expand All @@ -75,6 +82,8 @@ public interface ExperimentDAO {
// Load all recommendations of a particular experiment
List<KruizeRecommendationEntry> loadRecommendationsByExperimentName(String experimentName) throws Exception;

// Load all recommendations of a particular experiment
List<KruizeLMRecommendationEntry> loadLMRecommendationsByExperimentName(String experimentName) throws Exception;

// Load a single Performance Profile based on name
List<KruizePerformanceProfileEntry> loadPerformanceProfileByName(String performanceProfileName) throws Exception;
Expand All @@ -88,6 +97,8 @@ public interface ExperimentDAO {
// Load all recommendations of a particular experiment and interval end Time
KruizeRecommendationEntry loadRecommendationsByExperimentNameAndDate(String experimentName, String cluster_name, Timestamp interval_end_time) throws Exception;

KruizeLMRecommendationEntry loadLMRecommendationsByExperimentNameAndDate(String experimentName, String cluster_name, Timestamp interval_end_time) throws Exception;

// Get KruizeResult Record
List<KruizeResultsEntry> getKruizeResultsEntry(String experiment_name, String cluster_name, Timestamp interval_start_time, Timestamp interval_end_time) throws Exception;

Expand All @@ -97,6 +108,8 @@ public interface ExperimentDAO {

List<KruizeExperimentEntry> loadExperimentFromDBByInputJSON(StringBuilder clusterName, KubernetesAPIObject kubernetesAPIObject) throws Exception;

List<KruizeLMExperimentEntry> loadLMExperimentFromDBByInputJSON(StringBuilder clusterName, KubernetesAPIObject kubernetesAPIObject) throws Exception;

// Load all the datasources
List<KruizeDataSourceEntry> loadAllDataSources() throws Exception;

Expand Down
Loading
Loading