diff --git a/design/BulkAPI.md b/design/BulkAPI.md index 0962c9d7d..2d73b8d0c 100644 --- a/design/BulkAPI.md +++ b/design/BulkAPI.md @@ -51,7 +51,10 @@ progress of the job. "experiment_types": [ "container", "namespace" - ] + ], + "webhook": { + "url" : "http://127.0.0.1:8080/webhook" + } } ``` @@ -75,6 +78,9 @@ progress of the job. - **experiment_types:** Specifies the type(s) of experiments to run, e.g., `"container"` or `"namespace"`. +- **webhook:** The `webhook` parameter allows the system to notify an external service or consumer about the completion status of + an experiment-processing job. Once a job is completed, this webhook will be triggered to send an HTTP request to the URL defined in the bulk request payload. + ### Success Response - **Status:** 200 OK @@ -119,7 +125,10 @@ GET /bulk?job_id=123e4567-e89b-12d3-a456-426614174000 "processed_experiments": 23, "job_id": "54905959-77d4-42ba-8e06-90bb97b823b9", "job_start_time": "2024-10-10T06:07:09.066Z", - "job_end_time": "2024-10-10T06:07:17.471Z" + "job_end_time": "2024-10-10T06:07:17.471Z", + "webhook": { + "status": "COMPLETED" + } } ``` @@ -189,7 +198,7 @@ example 1: } ``` -example 2: +example 2: Job failed ```json { @@ -205,11 +214,35 @@ example 2: }, "job_id": "270fa4d9-2701-4ca0-b056-74229cc28498", "job_start_time": "2024-11-12T15:05:46.362Z", - "job_end_time": "2024-11-12T15:06:05.301Z" + "job_end_time": "2024-11-12T15:06:05.301Z", + "webhook": { + "status": "COMPLETED" + } } ``` +example 3: Only Webhook failed + +```json +{ + "status": "COMPLETED", + "total_experiments": 23, + "processed_experiments": 23, + "job_id": "54905959-77d4-42ba-8e06-90bb97b823b9", + "job_start_time": "2024-10-10T06:07:09.066Z", + "job_end_time": "2024-10-10T06:07:17.471Z", + "webhook": { + "status": "FAILED", + "notifications": { + "type": "ERROR", + "message": "HttpHostConnectException: Unable to connect to the webhook. Please try again later.", + "code": 503 + } + } +} +``` + ### Response Parameters ## API Description: Experiment and Recommendation Processing Status @@ -275,6 +308,18 @@ resource optimization in Kubernetes environments. Below is a breakdown of the JS - **Type**: `String (ISO 8601 format) or null` - **Description**: End timestamp of the job. If the job is still in progress, this will be `null`. +- **webhook**: + - **Type**: `Object` + - **Description**: An object that provides details about the webhook status and any errors encountered during the + webhook invocation. + + - The `webhook` parameter allows the system to notify an external service or consumer about the completion status of + an experiment-processing job. Once a job is completed, this webhook will be triggered to send an HTTP request to the URL defined in the bulk request payload. + This notification mechanism is essential for systems that require real-time updates about the job's processing + status, enabling consumers to take immediate follow-up actions. For example, an external analytics dashboard, a + monitoring service, or a message queue like Kafka can listen for these webhook calls to further process or log the + job completion data. + **Note: Experiment Name:** - **Naming Pattern:** Experiment names are currently formed using the following pattern: @@ -300,27 +345,27 @@ resource optimization in Kubernetes environments. Below is a breakdown of the JS apiVersion: v1 kind: ConfigMap metadata: -name: kruizeconfig -namespace: openshift-tuning + name: kruizeconfig + namespace: openshift-tuning data: -kruizeconfigjson: | - { - "datasource": [ - { - "name": "prometheus-1", - "provider": "prometheus", - "serviceName": "prometheus-k8s", - "namespace": "openshift-monitoring", - "url": "", - "authentication": { - "type": "bearer", - "credentials": { - "tokenFilePath": "/var/run/secrets/kubernetes.io/serviceaccount/token" + kruizeconfigjson: | + { + "datasource": [ + { + "name": "prometheus-1", + "provider": "prometheus", + "serviceName": "prometheus-k8s", + "namespace": "openshift-monitoring", + "url": "", + "authentication": { + "type": "bearer", + "credentials": { + "tokenFilePath": "/var/run/secrets/kubernetes.io/serviceaccount/token" + } } } - } - ] - } + ] + } ``` ## Limits @@ -395,4 +440,5 @@ number of labels, or none at all. Here are some examples: - "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%" -> Default - "%label:org_id%|%label:source_id%|%label:cluster_id%|%namespace%|%workloadtype%|%workloadname%|%containername%" - "%label:org_id%|%namespace%|%workloadtype%|%workloadname%|%containername%" -- "%label:org_id%|%label:cluster_id%|%namespace%|%workloadtype%|%workloadname%" \ No newline at end of file +- "%label:org_id%|%label:cluster_id%|%namespace%|%workloadtype%|%workloadname%" + diff --git a/manifests/crc/BYODB-installation/minikube/kruize-crc-minikube.yaml b/manifests/crc/BYODB-installation/minikube/kruize-crc-minikube.yaml index 2c5bc3d33..d1a9e9261 100644 --- a/manifests/crc/BYODB-installation/minikube/kruize-crc-minikube.yaml +++ b/manifests/crc/BYODB-installation/minikube/kruize-crc-minikube.yaml @@ -36,7 +36,7 @@ data: "local": "true", "logAllHttpReqAndResp": "true", "recommendationsURL" : "http://kruize.monitoring.svc.cluster.local:8080/generateRecommendations?experiment_name=%s", - "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", + "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", "hibernate": { "dialect": "org.hibernate.dialect.PostgreSQLDialect", "driver": "org.postgresql.Driver", diff --git a/manifests/crc/default-db-included-installation/aks/kruize-crc-aks.yaml b/manifests/crc/default-db-included-installation/aks/kruize-crc-aks.yaml index d37708da1..f94cf4c12 100644 --- a/manifests/crc/default-db-included-installation/aks/kruize-crc-aks.yaml +++ b/manifests/crc/default-db-included-installation/aks/kruize-crc-aks.yaml @@ -100,7 +100,7 @@ data: "local": "true", "logAllHttpReqAndResp": "true", "recommendationsURL" : "http://kruize.monitoring.svc.cluster.local:8080/generateRecommendations?experiment_name=%s", - "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", + "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", "experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%", "bulkapilimit" : 1000, "hibernate": { diff --git a/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml b/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml index 0fa7048fb..b4a3c470f 100644 --- a/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml +++ b/manifests/crc/default-db-included-installation/minikube/kruize-crc-minikube.yaml @@ -114,7 +114,7 @@ data: "local": "true", "logAllHttpReqAndResp": "true", "recommendationsURL" : "http://kruize.monitoring.svc.cluster.local:8080/generateRecommendations?experiment_name=%s", - "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", + "experimentsURL" : "http://kruize.monitoring.svc.cluster.local:8080/createExperiment", "experimentNameFormat" : "%datasource%|%clustername%|%namespace%|%workloadname%(%workloadtype%)|%containername%", "bulkapilimit" : 1000, "hibernate": { diff --git a/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml b/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml index 94b0d2c42..754283328 100644 --- a/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml +++ b/manifests/crc/default-db-included-installation/openshift/kruize-crc-openshift.yaml @@ -140,10 +140,10 @@ data: "namespace": "openshift-monitoring", "url": "", "authentication": { - "type": "bearer", - "credentials": { - "tokenFilePath": "/var/run/secrets/kubernetes.io/serviceaccount/token" - } + "type": "bearer", + "credentials": { + "tokenFilePath": "/var/run/secrets/kubernetes.io/serviceaccount/token" + } } } ] diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkInput.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkInput.java index e5e31d40d..dd5934945 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkInput.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkInput.java @@ -25,6 +25,7 @@ public class BulkInput { private FilterWrapper filter; private TimeRange time_range; private String datasource; + private Webhook webhook; // Getters and Setters @@ -136,4 +137,22 @@ public void setEnd(String end) { this.end = end; } } + + public static class Webhook{ + private String url; + public String getUrl() { + return url; + } + public void setUrl(String url) { + this.url = url; + } + } + + public Webhook getWebhook() { + return webhook; + } + + public void setWebhook(Webhook webhook) { + this.webhook = webhook; + } } diff --git a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java index 16da934bb..7dae51bb6 100644 --- a/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java +++ b/src/main/java/com/autotune/analyzer/serviceObjects/BulkJobStatus.java @@ -48,6 +48,7 @@ public class BulkJobStatus { private String endTime; // Change to String to store formatted time private Map notifications; private Map experiments = Collections.synchronizedMap(new HashMap<>()); + private Webhook webhook; public BulkJobStatus(String jobID, String status, Instant startTime) { this.jobID = jobID; @@ -112,6 +113,14 @@ public void setExperiments(Map experiments) { this.experiments = experiments; } + public Webhook getWebhook() { + return webhook; + } + + public void setWebhook(Webhook webhook) { + this.webhook = webhook; + } + // Method to add a new experiment with "unprocessed" status and null notification public synchronized Experiment addExperiment(String experimentName) { Experiment experiment = new Experiment(experimentName); @@ -188,6 +197,14 @@ public Recommendation getRecommendations() { public void setNotification(Notification notification) { this.notification = notification; } + + public Notification getNotification() { + return notification; + } + + public void setRecommendations(Recommendation recommendations) { + this.recommendations = recommendations; + } } public static class Recommendation { @@ -255,5 +272,29 @@ public void setCode(int code) { } } + public static class Webhook { + private KruizeConstants.KRUIZE_BULK_API.NotificationConstants.WebHookStatus status; + private Notification notifications; // Notifications can hold multiple entries + + public Webhook(KruizeConstants.KRUIZE_BULK_API.NotificationConstants.WebHookStatus status) { + this.status = status; + } + + public KruizeConstants.KRUIZE_BULK_API.NotificationConstants.WebHookStatus getStatus() { + return status; + } + + public void setStatus(KruizeConstants.KRUIZE_BULK_API.NotificationConstants.WebHookStatus status) { + this.status = status; + } + + public Notification getNotifications() { + return notifications; + } + + public void setNotifications(Notification notifications) { + this.notifications = notifications; + } + } -} + } diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index 6813251f5..4f507f51a 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -72,6 +72,15 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se boolean verbose = verboseParam != null && Boolean.parseBoolean(verboseParam); BulkJobStatus jobDetails; LOGGER.info("Job ID: " + jobID); + if (jobStatusMap.isEmpty()) { + sendErrorResponse( + resp, + null, + HttpServletResponse.SC_NOT_FOUND, + JOB_NOT_FOUND_MSG + ); + return; + } jobDetails = jobStatusMap.get(jobID); LOGGER.info("Job Status: " + jobDetails.getStatus()); resp.setContentType(JSON_CONTENT_TYPE); diff --git a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java index aa0c39005..b50f66f51 100644 --- a/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java +++ b/src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java @@ -118,23 +118,22 @@ private static Map parseLabelString(String labelString) { @Override public void run() { + DataSourceMetadataInfo metadataInfo = null; + DataSourceManager dataSourceManager = new DataSourceManager(); + DataSourceInfo datasource = null; try { String labelString = getLabels(this.bulkInput.getFilter()); if (null == this.bulkInput.getDatasource()) { this.bulkInput.setDatasource(CREATE_EXPERIMENT_CONFIG_BEAN.getDatasourceName()); } - DataSourceMetadataInfo metadataInfo = null; - DataSourceManager dataSourceManager = new DataSourceManager(); - DataSourceInfo datasource = null; try { datasource = CommonUtils.getDataSourceInfo(this.bulkInput.getDatasource()); } catch (Exception e) { LOGGER.error(e.getMessage()); e.printStackTrace(); - jobData.setStatus(FAILED); BulkJobStatus.Notification notification = DATASOURCE_NOT_REG_INFO; notification.setMessage(String.format(notification.getMessage(), e.getMessage())); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), notification); + setFinalJobStatus(FAILED,String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST),notification,datasource); } if (null != datasource) { JSONObject daterange = processDateRange(this.bulkInput.getTime_range()); @@ -144,16 +143,13 @@ public void run() { metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0, 0); } if (null == metadataInfo) { - jobData.setStatus(COMPLETED); - jobData.setEndTime(Instant.now()); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_OK), NOTHING_INFO); + setFinalJobStatus(COMPLETED,String.valueOf(HttpURLConnection.HTTP_OK),NOTHING_INFO,datasource); } else { Map createExperimentAPIObjectMap = getExperimentMap(labelString, jobData, metadataInfo, datasource); //Todo Store this map in buffer and use it if BulkAPI pods restarts and support experiment_type jobData.setTotal_experiments(createExperimentAPIObjectMap.size()); jobData.setProcessed_experiments(0); if (jobData.getTotal_experiments() > KruizeDeploymentInfo.BULK_API_LIMIT) { - jobData.setStatus(FAILED); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST), LIMIT_INFO); + setFinalJobStatus(FAILED,String.valueOf(HttpURLConnection.HTTP_BAD_REQUEST),LIMIT_INFO,datasource); } else { ExecutorService createExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); ExecutorService generateExecutor = Executors.newFixedThreadPool(bulk_thread_pool_size); @@ -185,8 +181,7 @@ public void run() { experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_BAD_REQUEST)); } finally { if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { - jobData.setStatus(COMPLETED); - jobData.setEndTime(Instant.now()); + setFinalJobStatus(COMPLETED,null,null,finalDatasource); } } @@ -202,7 +197,6 @@ public void run() { recommendationResponseCode = recommendationApiClient.callKruizeAPI(null); LOGGER.debug("API Response code: {}", recommendationResponseCode); if (recommendationResponseCode.getStatusCode() == HttpURLConnection.HTTP_CREATED) { - jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); experiment.getRecommendations().setStatus(NotificationConstants.Status.PROCESSED); } else { experiment.getRecommendations().setStatus(NotificationConstants.Status.FAILED); @@ -213,9 +207,9 @@ public void run() { experiment.getRecommendations().setStatus(NotificationConstants.Status.FAILED); experiment.getRecommendations().setNotifications(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); } finally { + jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { - jobData.setStatus(COMPLETED); - jobData.setEndTime(Instant.now()); + setFinalJobStatus(COMPLETED,null,null,finalDatasource); } } }); @@ -223,8 +217,10 @@ public void run() { } catch (Exception e) { e.printStackTrace(); experiment.setNotification(new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); - } finally { - + jobData.setProcessed_experiments(jobData.getProcessed_experiments() + 1); + if (jobData.getTotal_experiments() == jobData.getProcessed_experiments()) { + setFinalJobStatus(COMPLETED,null,null,finalDatasource); + } } }); } @@ -233,8 +229,6 @@ public void run() { } } catch (IOException e) { LOGGER.error(e.getMessage()); - jobData.setStatus(FAILED); - jobData.setEndTime(Instant.now()); BulkJobStatus.Notification notification; if (e instanceof SocketTimeoutException) { notification = DATASOURCE_GATEWAY_TIMEOUT_INFO; @@ -244,13 +238,44 @@ public void run() { notification = DATASOURCE_DOWN_INFO; } notification.setMessage(String.format(notification.getMessage(), e.getMessage())); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_UNAVAILABLE), notification); + setFinalJobStatus(FAILED,String.valueOf(HttpURLConnection.HTTP_UNAVAILABLE),notification,datasource); } catch (Exception e) { LOGGER.error(e.getMessage()); e.printStackTrace(); - jobData.setStatus(FAILED); - jobData.setEndTime(Instant.now()); - jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR), new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR)); + setFinalJobStatus(FAILED,String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR),new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR),datasource); + } + } + + public void setFinalJobStatus(String status,String notificationKey,BulkJobStatus.Notification notification,DataSourceInfo finalDatasource) { + jobData.setStatus(status); + jobData.setEndTime(Instant.now()); + if(null!=notification) + jobData.setNotification(notificationKey,notification); + GenericRestApiClient apiClient = new GenericRestApiClient(finalDatasource); + if(null != bulkInput.getWebhook() && null != bulkInput.getWebhook().getUrl()) { + apiClient.setBaseURL(bulkInput.getWebhook().getUrl()); + GenericRestApiClient.HttpResponseWrapper responseCode; + BulkJobStatus.Webhook webhook = new BulkJobStatus.Webhook(WebHookStatus.IN_PROGRESS); + jobData.setWebhook(webhook); + try { + responseCode = apiClient.callKruizeAPI("[" + new Gson().toJson(jobData) + "]"); + LOGGER.debug("API Response code: {}", responseCode); + if (responseCode.getStatusCode() == HttpURLConnection.HTTP_OK) { + webhook.setStatus(WebHookStatus.COMPLETED); + jobData.setWebhook(webhook); + } else { + BulkJobStatus.Notification webHookNotification = new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, responseCode.getResponseBody().toString(), responseCode.getStatusCode()); + webhook.setNotifications(webHookNotification); + webhook.setStatus(WebHookStatus.FAILED); + jobData.setWebhook(webhook); + } + } catch (Exception e) { + e.printStackTrace(); + BulkJobStatus.Notification webHookNotification = new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.toString(), HttpURLConnection.HTTP_INTERNAL_ERROR); + webhook.setNotifications(webHookNotification); + webhook.setStatus(WebHookStatus.FAILED); + jobData.setWebhook(webhook); + } } } diff --git a/src/main/java/com/autotune/utils/KruizeConstants.java b/src/main/java/com/autotune/utils/KruizeConstants.java index 11b22df33..5e07d3142 100644 --- a/src/main/java/com/autotune/utils/KruizeConstants.java +++ b/src/main/java/com/autotune/utils/KruizeConstants.java @@ -882,6 +882,21 @@ public String getStatus() { } } + public enum WebHookStatus { + INITIATED, // The Webhook has initiated a request + IN_PROGRESS, // The request to the Webhook is actively being processed + QUEUED, // The request to the Webhook has been queued, waiting for resources + SENT, // The request has been sent to the Webhook, but no response yet + RECEIVED, // The Webhook has received a response, but further processing continues + SUCCESS, // The request to the Webhook was successful + FAILED, // The call to the Webhook failed due to an error + RETRYING, // The Webhook is retrying the call due to a transient error + TIMED_OUT, // The request to the Webhook exceeded the allowed response time + ERROR_LOGGED, // The error has been logged for debugging or monitoring + COMPLETED, // The entire process, including subsequent processing, is finished + CANCELLED // The request was cancelled, potentially by user action or system condition + } + } }