Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filtration feature in the bulk API #1391

Merged
merged 8 commits into from
Dec 17, 2024
25 changes: 18 additions & 7 deletions design/BulkAPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,23 @@ progress of the job.
{
"filter": {
"exclude": {
"namespace": [],
"namespace": ["openshift-.*"],
"workload": [],
"containers": [],
"labels": {}
"labels": {
"org_id": "ABCOrga",
"source_id": "ZZZ",
"cluster_id": "ABG"
}
},
"include": {
"namespace": [],
"namespace": ["openshift-tuning"],
"workload": [],
"containers": [],
"labels": {
"key1": "value1",
"key2": "value2"
"org_id": "ABCOrga",
"source_id": "ZZZ",
"cluster_id": "ABG"
}
}
},
Expand Down Expand Up @@ -105,10 +110,16 @@ The specified time range determines the period over which the data is analyzed t
- The `start` timestamp precedes the `end` timestamp.

#### 2. **Request Payload with `exclude` filter specified:**
TBA

- **`exclude`** As shown in the example above, it filters out all namespaces starting with the name `openshift-` . So, we'll create experiments and generate recommendations for every namespace except those.

#### 3. **Request Payload with `include` filter specified:**
TBA

- **`include`** As shown in the example above, it filters out the namespace `openshift-`. So, we'll create experiments and generate recommendations for every namespace starting with the specified name.

#### 3. **Request Payload with both `include` and `exclude` filter specified:**

- **`include`** As shown in the example above, it filters out all namespaces starting with the name `openshift-` but includes the `openshift-tuning` one. So, we'll create experiments and generate recommendations for the `openshift-tuning` namespace.

### GET Request:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
return;
}

DataSourceMetadataInfo metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource,"",0,0,0);
DataSourceMetadataInfo metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource,"",0,0,0, new HashMap<>(), new HashMap<>());

// Validate imported metadataInfo object
DataSourceMetadataValidation validationObject = new DataSourceMetadataValidation();
Expand Down
34 changes: 29 additions & 5 deletions src/main/java/com/autotune/analyzer/workerimpl/BulkJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.autotune.operator.KruizeDeploymentInfo.bulk_thread_pool_size;
import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*;
Expand Down Expand Up @@ -125,8 +126,15 @@ public void run() {
DataSourceMetadataInfo metadataInfo = null;
DataSourceManager dataSourceManager = new DataSourceManager();
DataSourceInfo datasource = null;
String labelString = null;
Map<String, String> includeResourcesMap = null;
Map<String, String> excludeResourcesMap = null;
try {
String labelString = getLabels(this.bulkInput.getFilter());
if (this.bulkInput.getFilter() != null) {
labelString = getLabels(this.bulkInput.getFilter());
includeResourcesMap = buildRegexFilters(this.bulkInput.getFilter().getInclude());
excludeResourcesMap = buildRegexFilters(this.bulkInput.getFilter().getExclude());
}
if (null == this.bulkInput.getDatasource()) {
this.bulkInput.setDatasource(CREATE_EXPERIMENT_CONFIG_BEAN.getDatasourceName());
}
Expand All @@ -141,10 +149,13 @@ public void run() {
}
if (null != datasource) {
JSONObject daterange = processDateRange(this.bulkInput.getTime_range());
if (null != daterange)
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get(START_TIME), (Long) daterange.get(END_TIME), (Integer) daterange.get(STEPS));
if (null != daterange) {
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, (Long) daterange.get(START_TIME),
(Long) daterange.get(END_TIME), (Integer) daterange.get(STEPS), includeResourcesMap, excludeResourcesMap);
}
else {
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0, 0);
metadataInfo = dataSourceManager.importMetadataFromDataSource(datasource, labelString, 0, 0,
0, includeResourcesMap, excludeResourcesMap);
}
if (null == metadataInfo) {
setFinalJobStatus(COMPLETED, String.valueOf(HttpURLConnection.HTTP_OK), NOTHING_INFO, datasource);
Expand Down Expand Up @@ -367,7 +378,7 @@ private String getLabels(BulkInput.FilterWrapper filter) {
String uniqueKey = null;
try {
// Process labels in the 'include' section
if (filter != null && filter.getInclude() != null) {
if (filter.getInclude() != null) {
// Initialize StringBuilder for uniqueKey
StringBuilder includeLabelsBuilder = new StringBuilder();
Map<String, String> includeLabels = filter.getInclude().getLabels();
Expand All @@ -390,6 +401,19 @@ private String getLabels(BulkInput.FilterWrapper filter) {
return uniqueKey;
}

private Map<String, String> buildRegexFilters(BulkInput.Filter filter) {
Map<String, String> resourceFilters = new HashMap<>();
if (filter != null) {
resourceFilters.put("namespaceRegex", filter.getNamespace() != null ?
filter.getNamespace().stream().map(String::trim).collect(Collectors.joining("|")) : "");
resourceFilters.put("workloadRegex", filter.getWorkload() != null ?
filter.getWorkload().stream().map(String::trim).collect(Collectors.joining("|")) : "");
resourceFilters.put("containerRegex", filter.getContainers() != null ?
filter.getContainers().stream().map(String::trim).collect(Collectors.joining("|")) : "");
}
return resourceFilters;
}

private JSONObject processDateRange(BulkInput.TimeRange timeRange) {
//TODO: add validations for the time range
JSONObject dateRange = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void updateContainerDataSourceMetadataInfoObject(String dataSourceName, D

if (null == dataSourceNamespace) {
LOGGER.debug(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.INVALID_DATASOURCE_METADATA_NAMESPACE);
return;
continue;
}

// Iterate over workloads in namespaceWorkloadMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
*/
public class DataSourceQueries {
public enum PromQLQuery {
NAMESPACE_QUERY("sum by (namespace) ( avg_over_time(kube_namespace_status_phase{namespace!=\"\" ADDITIONAL_LABEL}[15d]))"),
WORKLOAD_INFO_QUERY("sum by (namespace, workload, workload_type) ( avg_over_time(namespace_workload_pod:kube_pod_owner:relabel{workload!=\"\" ADDITIONAL_LABEL}[15d]))"),
NAMESPACE_QUERY("sum by (namespace) ( avg_over_time(kube_namespace_status_phase{%s ADDITIONAL_LABEL}[15d]))"),
WORKLOAD_INFO_QUERY("sum by (namespace, workload, workload_type) ( avg_over_time(namespace_workload_pod:kube_pod_owner:relabel{%s ADDITIONAL_LABEL}[15d]))"),
CONTAINER_INFO_QUERY("sum by (container, image, workload, workload_type, namespace) (" +
" avg_over_time(kube_pod_container_info{container!=\"\" ADDITIONAL_LABEL}[15d]) *" +
" avg_over_time(kube_pod_container_info{%s ADDITIONAL_LABEL}[15d]) *" +
" on (pod, namespace,prometheus_replica) group_left(workload, workload_type)" +
" avg_over_time(namespace_workload_pod:kube_pod_owner:relabel{workload!=\"\" ADDITIONAL_LABEL}[15d])" +
" avg_over_time(namespace_workload_pod:kube_pod_owner:relabel{workload!~\"\" ADDITIONAL_LABEL}[15d])" +
")");
private final String query;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ public DataSourceManager() {
* @param startTime Get metadata from starttime to endtime
* @param endTime Get metadata from starttime to endtime
* @param steps the interval between data points in a range query
* @param includeResources
* @param excludeResources
* @return
*/
public DataSourceMetadataInfo importMetadataFromDataSource(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps) throws DataSourceDoesNotExist, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
public DataSourceMetadataInfo importMetadataFromDataSource(DataSourceInfo dataSourceInfo, String uniqueKey, long startTime, long endTime, int steps, Map<String, String> includeResources,
Map<String, String> excludeResources) throws DataSourceDoesNotExist, IOException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
String statusValue = "failure";
io.micrometer.core.instrument.Timer.Sample timerImportMetadata = Timer.start(MetricsConfig.meterRegistry());
try {
if (null == dataSourceInfo) {
throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_INFO);
}
DataSourceMetadataInfo dataSourceMetadataInfo = dataSourceMetadataOperator.createDataSourceMetadata(dataSourceInfo, uniqueKey, startTime, endTime, steps);
DataSourceMetadataInfo dataSourceMetadataInfo = dataSourceMetadataOperator.createDataSourceMetadata(dataSourceInfo, uniqueKey, startTime, endTime, steps, includeResources, excludeResources);
if (null == dataSourceMetadataInfo) {
LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.DATASOURCE_METADATA_INFO_NOT_AVAILABLE, "for datasource {}" + dataSourceInfo.getName());
return null;
Expand Down Expand Up @@ -105,7 +108,7 @@ public DataSourceMetadataInfo getMetadataFromDataSource(DataSourceInfo dataSourc
String dataSourceName = dataSource.getName();
DataSourceMetadataInfo dataSourceMetadataInfo = dataSourceMetadataOperator.getDataSourceMetadataInfo(dataSource);
if (null == dataSourceMetadataInfo) {
LOGGER.error(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.DATASOURCE_METADATA_INFO_NOT_AVAILABLE, "for datasource {}" + dataSourceName);
LOGGER.error(DATASOURCE_METADATA_INFO_NOT_AVAILABLE, "for datasource {}" + dataSourceName);
return null;
}
statusValue = "success";
Expand Down Expand Up @@ -136,9 +139,9 @@ public void updateMetadataFromDataSource(DataSourceInfo dataSource, DataSourceMe
throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceErrorMsgs.MISSING_DATASOURCE_INFO);
}
if (null == dataSourceMetadataInfo) {
throw new DataSourceDoesNotExist(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.DATASOURCE_METADATA_INFO_NOT_AVAILABLE);
throw new DataSourceDoesNotExist(DATASOURCE_METADATA_INFO_NOT_AVAILABLE);
}
dataSourceMetadataOperator.updateDataSourceMetadata(dataSource, "", 0, 0, 0);
dataSourceMetadataOperator.updateDataSourceMetadata(dataSource, "", 0, 0, 0, null, null);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
Expand Down Expand Up @@ -256,7 +259,7 @@ public DataSourceInfo fetchDataSourceFromDBByName(String dataSourceName) {
DataSourceInfo datasource = new ExperimentDBService().loadDataSourceFromDBByName(dataSourceName);
return datasource;
} catch (Exception e) {
LOGGER.error(String.format(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.LOAD_DATASOURCE_FROM_DB_ERROR, dataSourceName, e.getMessage()));
LOGGER.error(String.format(LOAD_DATASOURCE_FROM_DB_ERROR, dataSourceName, e.getMessage()));
}
return null;
}
Expand All @@ -276,7 +279,7 @@ public DataSourceMetadataInfo fetchDataSourceMetadataFromDBByName(String dataSou
DataSourceMetadataInfo metadataInfo = new ExperimentDBService().loadMetadataFromDBByName(dataSourceName, verbose);
return metadataInfo;
} catch (Exception e) {
LOGGER.error(String.format(KruizeConstants.DataSourceConstants.DataSourceMetadataErrorMsgs.LOAD_DATASOURCE_METADATA_FROM_DB_ERROR, dataSourceName, e.getMessage()));
LOGGER.error(String.format(LOAD_DATASOURCE_METADATA_FROM_DB_ERROR, dataSourceName, e.getMessage()));
}
return null;
}
Expand Down
Loading
Loading