Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5. Adding Recommendation Updater Service [KRUIZE-VPA Integration] #1416

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/com/autotune/Autotune.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.autotune.analyzer.exceptions.MonitoringAgentNotFoundException;
import com.autotune.analyzer.exceptions.MonitoringAgentNotSupportedException;
import com.autotune.analyzer.performanceProfiles.MetricProfileCollection;
import com.autotune.analyzer.recommendations.updater.RecommendationUpdaterService;
import com.autotune.analyzer.utils.AnalyzerConstants;
import com.autotune.common.datasource.DataSourceCollection;
import com.autotune.common.datasource.DataSourceInfo;
Expand Down Expand Up @@ -133,6 +134,8 @@ public static void main(String[] args) {
checkAvailableDataSources();
// load available metric profiles from db
loadMetricProfilesFromDB();
// start updater service
startRecommendationUpdaterService();

}
// close the existing session factory before recreating
Expand Down Expand Up @@ -288,4 +291,8 @@ private static void executeDDLs(String ddlFileName) throws Exception {
LOGGER.info(DBConstants.DB_MESSAGES.DB_LIVELINESS_PROBE_SUCCESS);
}

// starts the recommendation updater service
private static void startRecommendationUpdaterService() {
RecommendationUpdaterService.initiateUpdaterService();
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/autotune/analyzer/kruizeObject/KruizeObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class KruizeObject implements ExperimentTypeAware {
private String datasource;
@SerializedName(KruizeConstants.JSONKeys.EXPERIMENT_TYPE) //TODO: to be used in future
private AnalyzerConstants.ExperimentType experimentType;
@SerializedName("default_updater")
private String defaultUpdater;
private String namespace; // TODO: Currently adding it at this level with an assumption that there is only one entry in k8s object needs to be changed
private String mode; //Todo convert into Enum
@SerializedName("target_cluster")
Expand Down Expand Up @@ -310,6 +312,14 @@ public void setExperimentType(AnalyzerConstants.ExperimentType experimentType) {
}


public String getDefaultUpdater() {
return defaultUpdater;
}

public void setDefaultUpdater(String defaultUpdater) {
this.defaultUpdater = defaultUpdater;
}

@Override
public String toString() {
// Creating a temporary cluster name as we allow null for cluster name now
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*******************************************************************************
* Copyright (c) 2024 Red Hat, IBM Corporation and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

package com.autotune.analyzer.recommendations.updater;

import com.autotune.analyzer.experiment.ExperimentInterface;
import com.autotune.analyzer.experiment.ExperimentInterfaceImpl;
import com.autotune.analyzer.kruizeObject.KruizeObject;
import com.autotune.analyzer.recommendations.updater.vpa.VpaUpdaterImpl;
import com.autotune.analyzer.utils.AnalyzerConstants;
import com.autotune.analyzer.utils.AnalyzerErrorConstants;
import com.autotune.database.service.ExperimentDBService;
import com.autotune.database.table.KruizeExperimentEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class RecommendationUpdaterService {

private static final Logger LOGGER = LoggerFactory.getLogger(RecommendationUpdaterService.class);

public static void initiateUpdaterService() {
try {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

LOGGER.info(AnalyzerConstants.RecommendationUpdaterConstants.InfoMsgs.STARTING_SERVICE);
executorService.scheduleAtFixedRate(() -> {
try {
RecommendationUpdaterImpl updater = new RecommendationUpdaterImpl();
Map<String, KruizeObject> experiments = getAutoModeExperiments();
for (Map.Entry<String, KruizeObject> experiment : experiments.entrySet()) {
KruizeObject kruizeObject = updater.generateResourceRecommendationsForExperiment(experiment.getValue().getExperimentName());
// TODO:// add default updater in kruizeObject and check if GPU recommendations are present
if (kruizeObject.getDefaultUpdater() == null) {
kruizeObject.setDefaultUpdater(AnalyzerConstants.RecommendationUpdaterConstants.SupportedUpdaters.VPA);
}

if (kruizeObject.getDefaultUpdater().equalsIgnoreCase(AnalyzerConstants.RecommendationUpdaterConstants.SupportedUpdaters.VPA)) {
VpaUpdaterImpl vpaUpdater = VpaUpdaterImpl.getInstance();
vpaUpdater.applyResourceRecommendationsForExperiment(kruizeObject);
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}, AnalyzerConstants.RecommendationUpdaterConstants.DEFAULT_INITIAL_DELAY,
AnalyzerConstants.RecommendationUpdaterConstants.DEFAULT_SLEEP_INTERVAL,
TimeUnit.SECONDS);
} catch (Exception e) {
LOGGER.error(AnalyzerErrorConstants.RecommendationUpdaterErrors.UPDTAER_SERVICE_START_ERROR + e.getMessage());
}
}

private static Map<String, KruizeObject> getAutoModeExperiments() {
try {
LOGGER.debug(AnalyzerConstants.RecommendationUpdaterConstants.InfoMsgs.CHECKING_AUTO_EXP);
Map<String, KruizeObject> mainKruizeExperimentMap = new ConcurrentHashMap<>();
new ExperimentDBService().loadAllLMExperiments(mainKruizeExperimentMap);
// filter map to only include entries where mode is auto or recreate
Map<String, KruizeObject> filteredMap = mainKruizeExperimentMap.entrySet().stream()
.filter(entry -> {
String mode = entry.getValue().getMode();
return AnalyzerConstants.AUTO.equalsIgnoreCase(mode) || AnalyzerConstants.RECREATE.equalsIgnoreCase(mode);
})
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
return filteredMap;
} catch (Exception e) {
LOGGER.error(e.getMessage());
return new HashMap<>();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,8 @@ private RecommendationUpdaterConstants() {

}

public static final int DEFAULT_SLEEP_INTERVAL = 60;
public static final int DEFAULT_INITIAL_DELAY = 30;
public static final class SupportedUpdaters {
public static final String VPA = "vpa";

Expand Down Expand Up @@ -719,6 +721,8 @@ public static final class InfoMsgs {
public static final String VPA_PATCHED = "VPA object with name %s is patched successfully with recommendations.";
public static final String CREATEING_VPA = "Creating VPA with name: %s";
public static final String CREATED_VPA = "Created VPA with name: %s";
public static final String STARTING_SERVICE = "Starting recommendation updater.";
public static final String CHECKING_AUTO_EXP = "Searching for experiments with auto or recreate mode.";
private InfoMsgs() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ private RecommendationUpdaterErrors() {

}

public static final String UPDTAER_SERVICE_START_ERROR = "Error occurred while initializing RecommendationUpdaterService.";
public static final String UNSUPPORTED_UPDATER_TYPE = "Updater type %s is not supported.";
public static final String GENERATE_RECOMMNEDATION_FAILED = "Failed to generate recommendations for experiment: {}";
public static final String UPDATER_NOT_INSTALLED = "Updater is not installed.";
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/autotune/database/dao/ExperimentDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public interface ExperimentDAO {
// If Kruize object restarts load all experiment which are in inprogress
public List<KruizeExperimentEntry> loadAllExperiments() throws Exception;

// If Kruize object restarts load all local monitoring experiments which are in inprogress
public List<KruizeLMExperimentEntry> loadAllLMExperiments() throws Exception;

// If Kruize object restarts load all results from the experiments which are in inprogress
List<KruizeResultsEntry> loadAllResults() throws Exception;

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,29 @@ public List<KruizeExperimentEntry> loadAllExperiments() throws Exception {
return entries;
}

@Override
public List<KruizeLMExperimentEntry> loadAllLMExperiments() throws Exception {
//todo load only experimentStatus=inprogress , playback may not require completed experiments
List<KruizeLMExperimentEntry> entries = null;
String statusValue = "failure";
Timer.Sample timerLoadAllExp = Timer.start(MetricsConfig.meterRegistry());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new timer that is being added? Can you please update this file with the details on what this means

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a new timer. It's already present timerLoadAllExp. Because the table name has changed in Local Monitoring mode to KruizeLMExperiments. The loadAllExperiments function was not returning anything due to this change. This new function is created to load experiments from the local monitoring table KruizeLMExperiments, but it is similar to loadAllExperiments which is used in remote monitoring case.

try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) {
entries = session.createQuery(SELECT_FROM_LM_EXPERIMENTS, KruizeLMExperimentEntry.class).list();
// TODO: remove native sql query and transient
//getExperimentTypeInKruizeExperimentEntry(entries);
statusValue = "success";
} catch (Exception e) {
LOGGER.error("Not able to load experiment due to {}", e.getMessage());
throw new Exception("Error while loading exsisting experiments from database due to : " + e.getMessage());
} finally {
if (null != timerLoadAllExp) {
MetricsConfig.timerLoadAllExp = MetricsConfig.timerBLoadAllExp.tag("status", statusValue).register(MetricsConfig.meterRegistry());
timerLoadAllExp.stop(MetricsConfig.timerLoadAllExp);
}
}
return entries;
}

@Override
public List<KruizeResultsEntry> loadAllResults() throws Exception {
// TODO: load only experimentStatus=inProgress , playback may not require completed experiments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public class DBConstants {

public static final class SQLQUERY {
public static final String SELECT_FROM_EXPERIMENTS = "from KruizeExperimentEntry";
public static final String SELECT_FROM_LM_EXPERIMENTS = "from KruizeLMExperimentEntry";
public static final String SELECT_FROM_EXPERIMENTS_BY_EXP_NAME = "from KruizeExperimentEntry k WHERE k.experiment_name = :experimentName";
public static final String SELECT_FROM_LM_EXPERIMENTS_BY_EXP_NAME = "from KruizeLMExperimentEntry k WHERE k.experiment_name = :experimentName";
public static final String SELECT_FROM_RESULTS = "from KruizeResultsEntry";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,32 @@ public void loadAllExperiments(Map<String, KruizeObject> mainKruizeExperimentMap
}
}

public void loadAllLMExperiments(Map<String, KruizeObject> mainKruizeExperimentMap) throws Exception {
ExperimentInterface experimentInterface = new ExperimentInterfaceImpl();
List<KruizeLMExperimentEntry> entries = experimentDAO.loadAllLMExperiments();
if (null != entries && !entries.isEmpty()) {
List<CreateExperimentAPIObject> createExperimentAPIObjects = DBHelpers.Converters.KruizeObjectConverters.convertLMExperimentEntryToCreateExperimentAPIObject(entries);
if (null != createExperimentAPIObjects && !createExperimentAPIObjects.isEmpty()) {
List<KruizeObject> kruizeExpList = new ArrayList<>();

int failureThreshHold = createExperimentAPIObjects.size();
int failureCount = 0;
for (CreateExperimentAPIObject createExperimentAPIObject : createExperimentAPIObjects) {
KruizeObject kruizeObject = Converters.KruizeObjectConverters.convertCreateExperimentAPIObjToKruizeObject(createExperimentAPIObject);
if (null != kruizeObject) {
kruizeExpList.add(kruizeObject);
} else {
failureCount++;
}
}
if (failureThreshHold > 0 && failureCount == failureThreshHold) {
throw new Exception("None of the experiments are able to load from DB.");
}
experimentInterface.addExperimentToLocalStorage(mainKruizeExperimentMap, kruizeExpList);
}
}
}

public void loadAllResults(Map<String, KruizeObject> mainKruizeExperimentMap) throws Exception {
ExperimentInterface experimentInterface = new ExperimentInterfaceImpl();
KruizeObject kruizeObject;
Expand Down
Loading