Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Time Range Filter for Bulk API #1369

Merged
merged 8 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions design/BulkAPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ progress of the job.
}
}
},
"time_range": {},
"time_range": {
"start": "2024-11-01T00:00:00.000Z",
"end": "2024-11-15T23:59:59.000Z"
},
"datasource": "Cbank1Xyz",
"experiment_types": [
"container",
Expand Down Expand Up @@ -82,6 +85,24 @@ progress of the job.
"job_id": "123e4567-e89b-12d3-a456-426614174000"
}
```
### Different payload parameters examples

#### 1. **Request Payload with `time_range` specified:**

This object allows users to specify the duration for which they want to query data and receive recommendations. It consists of the following fields:

- **`start`**: The starting timestamp of the query duration in ISO 8601 format (`YYYY-MM-DDTHH:mm:ss.sssZ`).
- **`end`**: The ending timestamp of the query duration in ISO 8601 format (`YYYY-MM-DDTHH:mm:ss.sssZ`).

The specified time range determines the period over which the data is analyzed to provide recommendations at the container or namespace level. Ensure that:
- Both `start` and `end` are valid timestamps.
- The `start` timestamp precedes the `end` timestamp.

#### 2. **Request Payload with `exclude` filter specified:**
TBA

#### 3. **Request Payload with `include` filter specified:**
TBA

### GET Request:

Expand Down Expand Up @@ -216,28 +237,28 @@ resource optimization in Kubernetes environments. Below is a breakdown of the JS

- Each object in the `experiments` array has the following structure:

| Field | Type | Description |
|-------------------------|--------------|--------------------------------------------------------------------------|
| `name` | `string` | Name of the experiment, typically indicating a service name and deployment context. |
| `notification` | `object` | Notifications specific to this experiment (if any). |
| `recommendation` | `object` | Recommendation status and notifications specific to this experiment. |
| Field | Type | Description |
|-------------------|----------|-------------------------------------------------------------------------------------|
| `name` | `string` | Name of the experiment, typically indicating a service name and deployment context. |
| `notifications` | `object` | Notifications specific to this experiment (if any). |
| `recommendations` | `object` | Recommendation status and notifications specific to this experiment. |

#### Recommendation Object

The `recommendation` field within each experiment provides information about recommendation processing status and
The `recommendations` field within each experiment provides information about recommendation processing status and
errors (if any).

| Field | Type | Description |
|-------------------------|--------------|--------------------------------------------------------------------------|
| `status` | `string` | Status of the recommendation (e.g., `"unprocessed"`, `"processed"`, `"processing"`, `"failed"`). |
| `notification` | `object` | Notifications related to recommendation processing. |
| Field | Type | Description |
|-----------------|----------|--------------------------------------------------------------------------------------------------|
| `status` | `string` | Status of the recommendation (e.g., `"unprocessed"`, `"processed"`, `"processing"`, `"failed"`). |
| `notifications` | `object` | Notifications related to recommendation processing. |

#### Notification Object

Both the `notification` and `recommendation.notification` fields may contain error messages or warnings as follows:
Both the `notifications` and `recommendations.notifications` fields may contain error messages or warnings as follows:

| Field | Type | Description |
|-------------------------|--------------|----------------------------------------------------------------------------|
|-------------------------|--------------|----------------------------------------------------------------------------|
| `type` | `string` | Type of notification (e.g., `"info"`,`"error"`, `"warning"`). |
| `message` | `string` | Description of the notification message. |
| `code` | `integer` | HTTP-like code indicating the type of error (e.g., `400` for bad request). |
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void run() {
if (null != datasource) {
JSONObject daterange = processDateRange(this.bulkInput.getTime_range());
if (null != daterange)
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get("start_time"), (Long) daterange.get("end_time"), (Integer) daterange.get("steps"));
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get(START_TIME), (Long) daterange.get(END_TIME), (Integer) daterange.get(STEPS));
else {
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0, 0);
}
Expand Down Expand Up @@ -233,7 +233,7 @@ public void run() {
}
} catch (IOException e) {
LOGGER.error(e.getMessage());
jobData.setStatus("FAILED");
jobData.setStatus(FAILED);
jobData.setEndTime(Instant.now());
BulkJobStatus.Notification notification;
if (e instanceof SocketTimeoutException) {
Expand All @@ -248,7 +248,7 @@ public void run() {
} catch (Exception e) {
LOGGER.error(e.getMessage());
e.printStackTrace();
jobData.setStatus("FAILED");
jobData.setStatus(FAILED);
jobData.setEndTime(Instant.now());
jobData.setNotification(String.valueOf(HttpURLConnection.HTTP_INTERNAL_ERROR), new BulkJobStatus.Notification(BulkJobStatus.NotificationType.ERROR, e.getMessage(), HttpURLConnection.HTTP_INTERNAL_ERROR));
}
Expand Down Expand Up @@ -298,10 +298,10 @@ private String getLabels(BulkInput.FilterWrapper filter) {
includeLabelsBuilder.append(key).append("=").append("\"" + value + "\"").append(",")
);
// Remove trailing comma
if (includeLabelsBuilder.length() > 0) {
if (!includeLabelsBuilder.isEmpty()) {
includeLabelsBuilder.setLength(includeLabelsBuilder.length() - 1);
}
LOGGER.debug("Include Labels: " + includeLabelsBuilder);
LOGGER.debug("Include Labels: {}", includeLabelsBuilder);
uniqueKey = includeLabelsBuilder.toString();
}
}
Expand All @@ -313,10 +313,11 @@ private String getLabels(BulkInput.FilterWrapper filter) {
}

private JSONObject processDateRange(BulkInput.TimeRange timeRange) {
//TODO: add validations for the time range
JSONObject dateRange = null;
if (null != timeRange && timeRange.getStart() != null && timeRange.getEnd() != null) {
String intervalEndTimeStr = timeRange.getStart();
String intervalStartTimeStr = timeRange.getEnd();
String intervalStartTimeStr = timeRange.getStart();
String intervalEndTimeStr = timeRange.getEnd();
long interval_end_time_epoc = 0;
long interval_start_time_epoc = 0;
LocalDateTime localDateTime = LocalDateTime.parse(intervalEndTimeStr, DateTimeFormatter.ofPattern(KruizeConstants.DateFormats.STANDARD_JSON_DATE_FORMAT));
Expand All @@ -327,9 +328,9 @@ private JSONObject processDateRange(BulkInput.TimeRange timeRange) {
Timestamp interval_start_time = Timestamp.from(localDateTime.toInstant(ZoneOffset.UTC));
int steps = CREATE_EXPERIMENT_CONFIG_BEAN.getMeasurementDuration() * KruizeConstants.TimeConv.NO_OF_SECONDS_PER_MINUTE; // todo fetch experiment recommendations setting measurement
dateRange = new JSONObject();
dateRange.put("start_time", interval_start_time_epoc);
dateRange.put("end_time", interval_end_time_epoc);
dateRange.put("steps", steps);
dateRange.put(START_TIME, interval_start_time_epoc);
dateRange.put(END_TIME, interval_end_time_epoc);
dateRange.put(STEPS, steps);
}
return dateRange;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@
*******************************************************************************/
package com.autotune.common.datasource;

import com.autotune.analyzer.exceptions.FetchMetricsError;
import com.autotune.common.data.dataSourceMetadata.*;
import com.autotune.common.data.dataSourceQueries.PromQLDataSourceQueries;
import com.autotune.utils.GenericRestApiClient;
import com.autotune.utils.KruizeConstants;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.io.IOException;
import java.net.URLEncoder;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;

import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.CHARACTER_ENCODING;

/**
* DataSourceMetadataOperator is an abstraction with CRUD operations to manage DataSourceMetadataInfo Object
* representing JSON for a given data source
Expand Down Expand Up @@ -168,19 +180,20 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
String containerQuery = PromQLDataSourceQueries.CONTAINER_QUERY;
if (null != uniqueKey && !uniqueKey.isEmpty()) {
LOGGER.debug("uniquekey: {}", uniqueKey);
namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey);
workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey);
containerQuery = containerQuery.replace("ADDITIONAL_LABEL", "," + uniqueKey);
namespaceQuery = namespaceQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "," + uniqueKey);
workloadQuery = workloadQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "," + uniqueKey);
containerQuery = containerQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "," + uniqueKey);
} else {
namespaceQuery = namespaceQuery.replace("ADDITIONAL_LABEL", "");
workloadQuery = workloadQuery.replace("ADDITIONAL_LABEL", "");
containerQuery = containerQuery.replace("ADDITIONAL_LABEL", "");
namespaceQuery = namespaceQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "");
workloadQuery = workloadQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "");
containerQuery = containerQuery.replace(KruizeConstants.KRUIZE_BULK_API.ADDITIONAL_LABEL, "");
}
LOGGER.info("namespaceQuery: {}", namespaceQuery);
LOGGER.info("workloadQuery: {}", workloadQuery);
LOGGER.info("containerQuery: {}", containerQuery);

JsonArray namespacesDataResultArray = op.getResultArrayForQuery(dataSourceInfo, namespaceQuery);
JsonArray namespacesDataResultArray = fetchQueryResults(dataSourceInfo, namespaceQuery, startTime, endTime, steps);
LOGGER.debug("namespacesDataResultArray: {}", namespacesDataResultArray);
if (!op.validateResultArray(namespacesDataResultArray)) {
dataSourceMetadataInfo = dataSourceDetailsHelper.createDataSourceMetadataInfoObject(dataSourceName, null);
} else {
Expand All @@ -201,8 +214,8 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
* TODO - get workload metadata for a given namespace
*/
HashMap<String, HashMap<String, DataSourceWorkload>> datasourceWorkloads = new HashMap<>();
JsonArray workloadDataResultArray = op.getResultArrayForQuery(dataSourceInfo,
workloadQuery);
JsonArray workloadDataResultArray = fetchQueryResults(dataSourceInfo, workloadQuery, startTime, endTime, steps);
LOGGER.debug("workloadDataResultArray: {}", workloadDataResultArray);

if (op.validateResultArray(workloadDataResultArray)) {
datasourceWorkloads = dataSourceDetailsHelper.getWorkloadInfo(workloadDataResultArray);
Expand All @@ -220,8 +233,8 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
* TODO - get container metadata for a given workload
*/
HashMap<String, HashMap<String, DataSourceContainer>> datasourceContainers = new HashMap<>();
JsonArray containerDataResultArray = op.getResultArrayForQuery(dataSourceInfo,
containerQuery);
JsonArray containerDataResultArray = fetchQueryResults(dataSourceInfo, containerQuery, startTime, endTime, steps);

LOGGER.debug("containerDataResultArray: {}", containerDataResultArray);

if (op.validateResultArray(containerDataResultArray)) {
Expand All @@ -235,4 +248,19 @@ public DataSourceMetadataInfo processQueriesAndPopulateDataSourceMetadataInfo(Da
return null;

}

private JsonArray fetchQueryResults(DataSourceInfo dataSourceInfo, String query, long startTime, long endTime, int steps) throws IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
GenericRestApiClient client = new GenericRestApiClient(dataSourceInfo);
String metricsUrl = String.format(KruizeConstants.DataSourceConstants.DATASOURCE_ENDPOINT_WITH_QUERY,
dataSourceInfo.getUrl(),
URLEncoder.encode(query, CHARACTER_ENCODING),
startTime,
endTime,
steps);
LOGGER.debug("MetricsUrl: {}", metricsUrl);
client.setBaseURL(metricsUrl);
JSONObject genericJsonObject = client.fetchMetricsJson(KruizeConstants.APIMessages.GET, "");
JsonObject jsonObject = new Gson().fromJson(genericJsonObject.toString(), JsonObject.class);
return jsonObject.getAsJsonObject(KruizeConstants.JSONKeys.DATA).getAsJsonArray(KruizeConstants.DataSourceConstants.DataSourceQueryJSONKeys.RESULT);
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/autotune/utils/KruizeConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,11 @@ public static final class KRUIZE_BULK_API {
public static final String FAILED = "FAILED";
public static final String LIMIT_MESSAGE = "The number of experiments exceeds %s.";
public static final String NOTHING = "Nothing to do.";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String STEPS = "steps";
public static final String ADDITIONAL_LABEL = "ADDITIONAL_LABEL";

// TODO : Bulk API Create Experiments defaults
public static final CreateExperimentConfigBean CREATE_EXPERIMENT_CONFIG_BEAN;

Expand Down
Loading