Skip to content

Commit

Permalink
Create index-template-commands on POST request (#83)
Browse files Browse the repository at this point in the history
* Create index-template-commands on POST request

* Fix Rest tests
  • Loading branch information
AlexRuiz7 authored Oct 1, 2024
1 parent 285a1ad commit fcf48f3
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@
import java.util.List;
import java.util.function.Supplier;


/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to
* receive raw commands from the Wazuh Server. These commands are processed,
* indexed and sent back to the Server for its delivery to, in most cases, the
* Agents.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_commandmanager";
public static final String COMMAND_MANAGER_INDEX_NAME = "command-manager";
public static final String COMMAND_MANAGER_INDEX_NAME = ".commands";
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";

private CommandIndex commandIndex;
private ThreadPool threadPool;

@Override
public Collection<Object> createComponents(
Expand All @@ -57,8 +62,7 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.commandIndex = new CommandIndex(client);
this.threadPool = threadPool;
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
return Collections.emptyList();
}

Expand All @@ -71,6 +75,6 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return Collections.singletonList(new RestPostCommandAction(this.commandIndex, this.threadPool));
return Collections.singletonList(new RestPostCommandAction(this.commandIndex));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,103 +9,133 @@

import com.wazuh.commandmanager.CommandManagerPlugin;
import com.wazuh.commandmanager.model.Command;
import com.wazuh.commandmanager.utils.IndexTemplateUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

public class CommandIndex implements IndexingOperationListener {

private static final Logger logger = LogManager.getLogger(CommandIndex.class);

private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

/**
* @param client
* Default constructor
*
* @param client OpenSearch client.
* @param clusterService OpenSearch cluster service.
* @param threadPool An OpenSearch ThreadPool.
*/
public CommandIndex(Client client) {
public CommandIndex(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

/**
* @param command a Command class command
* @return Indexing operation RestStatus response
* @throws ExecutionException
* @param command: A Command model object
* @return A CompletableFuture with the RestStatus response from the operation
*/
public RestStatus create(Command command) throws ExecutionException, InterruptedException {
CompletableFuture<IndexResponse> inProgressFuture = new CompletableFuture<>();
public CompletableFuture<RestStatus> asyncCreate(Command command) {
CompletableFuture<RestStatus> future = new CompletableFuture<>();
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);

// Create index template if it does not exist.
if (!indexTemplateExists(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME)) {
putIndexTemplate(CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME);
} else {
logger.info(
"Index template {} already exists. Skipping creation.",
CommandManagerPlugin.COMMAND_MANAGER_INDEX_TEMPLATE_NAME
);
}

logger.debug("Indexing command {}", command);
try {
logger.info("Creating request for command: {}", command.getId());
IndexRequest request = new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(command.getId())
.create(true);

client.index(
request,
new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
inProgressFuture.complete(indexResponse);
}

@Override
public void onFailure(Exception e) {
logger.info("Could not process command: {}", command.getId(), e);
inProgressFuture.completeExceptionally(e);
executor.submit(
() -> {
try (ThreadContext.StoredContext ignored = this.threadPool.getThreadContext().stashContext()) {
RestStatus restStatus = client.index(request).actionGet().status();
future.complete(restStatus);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
);
} catch (IOException e) {
logger.error("IOException occurred creating command details", e);
logger.error(
"Failed to index command with ID {}: {}", command.getId(), e);
}
return inProgressFuture.get().status();
return future;
}

/**
*
* @param command: A Command model object
* @param threadPool: An OpenSearch ThreadPool as passed to the createComponents() method
* @return A CompletableFuture with the RestStatus response from the operation
* @return
*/
public boolean indexTemplateExists(String template_name) {
Map<String, IndexTemplateMetadata> templates = this.clusterService
.state()
.metadata()
.templates();
logger.debug("Existing index templates: {} ", templates);

public CompletableFuture<RestStatus> asyncCreate(Command command, ThreadPool threadPool) {
CompletableFuture<RestStatus> future = new CompletableFuture<>();
ExecutorService executor = threadPool.executor(ThreadPool.Names.WRITE);
return templates.containsKey(template_name);
}

/**
* Inserts an index template
*
* @param templateName : The name if the index template to load
*/
public void putIndexTemplate(String templateName) {
ExecutorService executor = this.threadPool.executor(ThreadPool.Names.WRITE);
try {
IndexRequest request = new IndexRequest()
.index(CommandManagerPlugin.COMMAND_MANAGER_INDEX_NAME)
.source(command.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(command.getId())
.create(true);
executor.submit(
() -> {
try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) {
RestStatus restStatus = client.index(request).actionGet().status();
future.complete(restStatus);
} catch (Exception e) {
future.completeExceptionally(e);
}
// @throws IOException
Map<String, Object> template = IndexTemplateUtils.fromFile(templateName + ".json");

PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest()
.mapping(IndexTemplateUtils.get(template, "mappings"))
.settings(IndexTemplateUtils.get(template, "settings"))
.name(templateName)
.patterns((List<String>) template.get("index_patterns"));

executor.submit(() -> {
AcknowledgedResponse acknowledgedResponse = this.client.admin().indices().putTemplate(putIndexTemplateRequest).actionGet();
if (acknowledgedResponse.isAcknowledged()) {
logger.info(
"Index template created successfully: {}",
templateName
);
}
);
} catch (Exception e) {
logger.error(e);
});

} catch (IOException e) {
logger.error("Error reading index template from filesystem {}", templateName);
}
return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(VERSION, this.version);
return builder.endObject();
}

@Override
public String toString() {
return "Action{" +
"type='" + type + '\'' +
", args=" + args +
", version='" + version + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package com.wazuh.commandmanager.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.UUIDs;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
Expand All @@ -19,7 +21,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

public class Command implements ToXContentObject {

public static final String NAME = "command";
public static final String ORDER_ID = "order_id";
public static final String REQUEST_ID = "request_id";
public static final String SOURCE = "source";
Expand Down Expand Up @@ -85,10 +87,12 @@ public static Command parse(XContentParser parser) throws IOException {
String user = null;
Action action = null;

// @TODO check if this call is necessary as ensureExpectedToken is invoked previously
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
// skips JSON's root level "command"
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();

parser.nextToken();
switch (fieldName) {
case SOURCE:
Expand Down Expand Up @@ -132,6 +136,7 @@ public static Command parse(XContentParser parser) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

builder.startObject(NAME);
builder.field(SOURCE, this.source);
builder.field(USER, this.user);
builder.field(TARGET, this.target);
Expand All @@ -141,6 +146,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS, this.status);
builder.field(ORDER_ID, this.orderId);
builder.field(REQUEST_ID, this.requestId);
builder.endObject();

return builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -39,17 +38,14 @@ public class RestPostCommandAction extends BaseRestHandler {
public static final String POST_COMMAND_ACTION_REQUEST_DETAILS = "post_command_action_request_details";
private static final Logger logger = LogManager.getLogger(RestPostCommandAction.class);
private final CommandIndex commandIndex;
private final ThreadPool threadPool;

/**
* Default constructor
*
* @param commandIndex persistence layer
* @param threadPool
*/
public RestPostCommandAction(CommandIndex commandIndex, ThreadPool threadPool) {
public RestPostCommandAction(CommandIndex commandIndex) {
this.commandIndex = commandIndex;
this.threadPool = threadPool;

}

Expand All @@ -60,21 +56,21 @@ public String getName() {
@Override
public List<Route> routes() {
return Collections.singletonList(
new Route(
POST,
String.format(
Locale.ROOT,
"%s",
CommandManagerPlugin.COMMAND_MANAGER_BASE_URI
new Route(
POST,
String.format(
Locale.ROOT,
"%s",
CommandManagerPlugin.COMMAND_MANAGER_BASE_URI
)
)
)
);
}

@Override
protected RestChannelConsumer prepareRequest(
final RestRequest restRequest,
final NodeClient client
final RestRequest restRequest,
final NodeClient client
) throws IOException {
// Get request details
XContentParser parser = restRequest.contentParser();
Expand All @@ -84,7 +80,7 @@ protected RestChannelConsumer prepareRequest(

// Send response
return channel -> {
commandIndex.asyncCreate(command, this.threadPool)
this.commandIndex.asyncCreate(command)
.thenAccept(restStatus -> {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
Expand All @@ -94,7 +90,7 @@ protected RestChannelConsumer prepareRequest(
builder.endObject();
channel.sendResponse(new BytesRestResponse(restStatus, builder));
} catch (Exception e) {
logger.error("Error indexing command: ",e);
logger.error("Error indexing command: ", e);
}
}).exceptionally(e -> {
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
Expand Down
Loading

0 comments on commit fcf48f3

Please sign in to comment.