diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index 86e351f7d..d032e2b50 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -15,7 +15,6 @@ *******************************************************************************/ package com.autotune.analyzer.workerimpl; - import com.autotune.analyzer.kruizeObject.RecommendationSettings; import com.autotune.analyzer.serviceObjects.*; import com.autotune.analyzer.utils.AnalyzerConstants; @@ -31,6 +30,7 @@ import com.autotune.utils.Utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.gson.Gson; +import io.micrometer.core.instrument.Timer; import org.apache.http.conn.ConnectTimeoutException; import org.json.JSONObject; import org.slf4j.Logger; @@ -50,7 +50,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -58,7 +57,6 @@ import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.NotificationConstants.*; - /** * The `run` method processes bulk input to create experiments and generates resource optimization recommendations. * It handles the creation of experiment names based on various data source components, makes HTTP POST requests @@ -123,7 +121,7 @@ private static Map parseLabelString(String labelString) { public void run() { String statusValue = "failure"; MetricsConfig.activeJobs.incrementAndGet(); - io.micrometer.core.instrument.Timer.Sample timerRunJob = Timer.start(MetricsConfig.meterRegistry()); + Timer.Sample timerRunJob = Timer.start(MetricsConfig.meterRegistry()); DataSourceMetadataInfo metadataInfo = null; DataSourceManager dataSourceManager = new DataSourceManager(); DataSourceInfo datasource = null; @@ -170,26 +168,22 @@ public void run() { GenericRestApiClient apiClient = new GenericRestApiClient(finalDatasource); apiClient.setBaseURL(KruizeDeploymentInfo.experiments_url); GenericRestApiClient.HttpResponseWrapper responseCode; - boolean experiment_exists = false; + boolean expriment_exists = false; try { responseCode = apiClient.callKruizeAPI("[" + new Gson().toJson(apiObject) + "]"); LOGGER.debug("API Response code: {}", responseCode); if (responseCode.getStatusCode() == HttpURLConnection.HTTP_CREATED) { - experiment_exists = true; - } else if (responseCode.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { - experiment_exists = true; - } else { + expriment_exists = true; } else if (responseCode.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { - experiment_exists = true; + expriment_exists = true; } else { - jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, responseCode.getResponseBody().toString(), responseCode.getStatusCode())); } } catch (Exception e) { e.printStackTrace(); experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } finally { - if (!experiment_exists) { + if (!expriment_exists) { LOGGER.info("Processing experiment {}", jobData.getProcessed_experiments()); jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); } @@ -198,9 +192,9 @@ public void run() { setFinalJobStatus(COMPLETED, null, null, finalDatasource); } } - } + } - if (experiment_exists) { + if (expriment_exists) { generateExecutor.submit(() -> { // send request to generateRecommendations API GenericRestApiClient recommendationApiClient = new GenericRestApiClient(finalDatasource); @@ -242,6 +236,7 @@ public void run() { }); } } finally { + // Shutdown createExecutor and wait for it to finish createExecutor.shutdown(); while (!createExecutor.isTerminated()) { try { @@ -252,6 +247,7 @@ public void run() { } } + // Shutdown generateExecutor and wait for it to finish generateExecutor.shutdown(); while (!generateExecutor.isTerminated()) { try { @@ -512,4 +508,3 @@ public String frameExperimentName(String labelString, DataSourceCluster dataSour return experimentName; } } -