Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/gradle/plugins/command-manager/…
Browse files Browse the repository at this point in the history
…org.apache.logging.log4j-log4j-slf4j-impl-2.24.2
  • Loading branch information
AlexRuiz7 authored Dec 13, 2024
2 parents 6a7d171 + 5dd5396 commit 70443db
Show file tree
Hide file tree
Showing 29 changed files with 1,338 additions and 1,330 deletions.
51 changes: 36 additions & 15 deletions plugins/command-manager/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@

import org.opensearch.gradle.test.RestIntegTestTask
import java.util.concurrent.Callable

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.18.0-SNAPSHOT")
opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","")
opensearch_build = opensearch_no_snapshot + ".0"
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
wazuh_version = System.getProperty("version", "5.0.0")
revision = System.getProperty("revision", "0")
}
Expand All @@ -24,7 +28,7 @@ apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.rest-test'
apply plugin: 'opensearch.pluginzip'

def pluginName = 'wazuh-indexer-command-manager'
Expand Down Expand Up @@ -67,6 +71,7 @@ opensearchplugin {
name pluginName
description pluginDescription
classname "${projectPath}.${pathToPlugin}.${pluginClassName}"
extendedPlugins = ['opensearch-job-scheduler']
licenseFile rootProject.file('LICENSE.txt')
noticeFile rootProject.file('NOTICE.txt')
}
Expand All @@ -88,6 +93,10 @@ def versions = [
imposter: "4.1.2"
]

configurations {
zipArchive
}

dependencies {
implementation "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
implementation "org.apache.httpcomponents.core5:httpcore5-h2:${versions.httpcore5}"
Expand All @@ -99,6 +108,11 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"

// Job Scheduler stuff
zipArchive group: 'org.opensearch.plugin', name: 'opensearch-job-scheduler', version: "${opensearch_build}"
// implementation "org.opensearch:opensearch:${opensearch_version}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"

testImplementation "junit:junit:${versions.junit}"

// imposter
Expand Down Expand Up @@ -134,30 +148,37 @@ test {
jvmArgs "-Djava.security.policy=./plugins/command-manager/src/main/plugin-metadata/plugin-security.policy/plugin-security.policy"
}

task integTest(type: RestIntegTestTask) {
description = "Run tests against a cluster"
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath
def getJobSchedulerPlugin() {
provider(new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.matching {
include '**/opensearch-job-scheduler*'
}.singleFile
}
}
}
})
}
tasks.named("check").configure { dependsOn(integTest) }

integTest {
testClusters.integTest {
plugin(getJobSchedulerPlugin())
testDistribution = "ARCHIVE"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)

// The --debug-jvm command-line option makes the cluster debuggable; this makes the tests debuggable
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005'
}
}

testClusters.integTest {
testDistribution = "INTEG_TEST"
//testDistribution = "ARCHIVE"
// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)

// add customized keystore
keystore 'm_api.auth.username', 'admin'
keystore 'm_api.auth.password', 'test'
keystore 'm_api.uri', 'http://127.0.0.1:55000' // base URI of the M_API
keystore 'm_api.uri', 'https://127.0.0.1:55000' // base URI of the M_API
}

run {
Expand Down
21 changes: 16 additions & 5 deletions plugins/command-manager/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,40 @@ paths:
post:
tags:
- "authentication"
summary: Add a new command to the queue.
description: Add a new command to the queue.
summary: Mock of the Wazuh Server M_API authentication endpoint.
description: Returns a JWT.
responses:
"200":
description: OK
/commands:
post:
tags:
- "commands"
summary: Add a new command to the queue.
description: Add a new command to the queue.
summary: Add commands.
description: Receives and processes an array of commands.
requestBody:
required: true
content:
"application/json":
schema:
$ref: "#/components/schemas/Command"
$ref: "#/components/schemas/Commands"
responses:
"200":
description: OK
"400":
description: parsing_exception
"500":
description: Internal server error (boom!)

components:
schemas:
Commands:
type: object
properties:
commands:
type: array
items:
$ref: '#/components/schemas/Command'
Command:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,84 @@
*/
package com.wazuh.commandmanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.*;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ReloadablePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.*;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import com.wazuh.commandmanager.index.CommandIndex;
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobParameter;
import com.wazuh.commandmanager.jobscheduler.CommandManagerJobRunner;
import com.wazuh.commandmanager.jobscheduler.JobDocument;
import com.wazuh.commandmanager.rest.RestPostCommandAction;
import com.wazuh.commandmanager.settings.PluginSettings;
import com.wazuh.commandmanager.utils.httpclient.HttpRestClient;

/**
* The Command Manager plugin exposes an HTTP API with a single endpoint to receive raw commands
* from the Wazuh Server. These commands are processed, indexed and sent back to the Server for its
* delivery to, in most cases, the Agents.
* delivery to, in most cases, the Agents. The Command Manager plugin exposes an HTTP API with a
* single endpoint to receive raw commands from the Wazuh Server. These commands are processed,
* indexed and sent back to the Server for its delivery to, in most cases, the Agents.
*
* <p>The Command Manager plugin is also a JobScheduler extension plugin.
*/
public class CommandManagerPlugin extends Plugin implements ActionPlugin, ReloadablePlugin {
public class CommandManagerPlugin extends Plugin
implements ActionPlugin, ReloadablePlugin, JobSchedulerExtension {
public static final String COMMAND_MANAGER_BASE_URI = "/_plugins/_command_manager";
public static final String COMMANDS_URI = COMMAND_MANAGER_BASE_URI + "/commands";
public static final String COMMAND_MANAGER_INDEX_NAME = ".commands";
public static final String COMMAND_MANAGER_INDEX_TEMPLATE_NAME = "index-template-commands";
public static final String COMMAND_DOCUMENT_PARENT_OBJECT_NAME = "command";
public static final String JOB_INDEX_NAME = ".scheduled-commands";
public static final String JOB_INDEX_TEMPLATE_NAME = "index-template-scheduled-commands";
public static final Integer JOB_PERIOD_MINUTES = 1;
public static final Integer PAGE_SIZE = 100;
public static final Long DEFAULT_TIMEOUT_SECONDS = 20L;
public static final TimeValue PIT_KEEP_ALIVE_SECONDS = TimeValue.timeValueSeconds(30L);

private static final Logger log = LogManager.getLogger(CommandManagerPlugin.class);
public static final String JOB_TYPE = "command_manager_scheduler_extension";

private CommandIndex commandIndex;
private JobDocument jobDocument;

@Override
public Collection<Object> createComponents(
Expand All @@ -68,9 +103,51 @@ public Collection<Object> createComponents(
this.commandIndex = new CommandIndex(client, clusterService, threadPool);
PluginSettings.getInstance(environment.settings());

// JobSchedulerExtension stuff
CommandManagerJobRunner.getInstance()
.setThreadPool(threadPool)
.setClient(client)
.setClusterService(clusterService)
.setEnvironment(environment);

scheduleCommandJob(client, clusterService, threadPool);

return Collections.emptyList();
}

/**
* Indexes a document into the jobs index, so that JobScheduler plugin can run it
*
* @param client: The cluster client, used for indexing
* @param clusterService: Provides the addListener method. We use it to determine if this is a
* new cluster.
* @param threadPool: Used by jobDocument to create the document in a thread.
*/
private void scheduleCommandJob(
Client client, ClusterService clusterService, ThreadPool threadPool) {
clusterService.addListener(
event -> {
if (event.localNodeClusterManager() && event.isNewCluster()) {
jobDocument = JobDocument.getInstance();
CompletableFuture<IndexResponse> indexResponseCompletableFuture =
jobDocument.create(
clusterService,
client,
threadPool,
UUIDs.base64UUID(),
getJobType(),
JOB_PERIOD_MINUTES);
indexResponseCompletableFuture.thenAccept(
indexResponse -> {
log.info(
"Scheduled task successfully, response: {}",
indexResponse.getResult().toString());
});
}
});
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
Expand Down Expand Up @@ -100,6 +177,75 @@ public void reload(Settings settings) {
// xxxService.refreshAndClearCache(commandManagerSettings);
}

@Override
public String getJobType() {
return CommandManagerPlugin.JOB_TYPE;
}

@Override
public String getJobIndex() {
return CommandManagerPlugin.JOB_INDEX_NAME;
}

@Override
public ScheduledJobRunner getJobRunner() {
log.info("getJobRunner() executed");
return CommandManagerJobRunner.getInstance();
}

@Override
public ScheduledJobParser getJobParser() {
log.info("getJobParser() executed");
return (parser, id, jobDocVersion) -> {
CommandManagerJobParameter jobParameter = new CommandManagerJobParameter();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case CommandManagerJobParameter.NAME_FIELD:
jobParameter.setJobName(parser.text());
break;
case CommandManagerJobParameter.ENABLED_FIELD:
jobParameter.setEnabled(parser.booleanValue());
break;
case CommandManagerJobParameter.ENABLED_TIME_FIELD:
jobParameter.setEnabledTime(parseInstantValue(parser));
break;
case CommandManagerJobParameter.LAST_UPDATE_TIME_FIELD:
jobParameter.setLastUpdateTime(parseInstantValue(parser));
break;
case CommandManagerJobParameter.SCHEDULE_FIELD:
jobParameter.setSchedule(ScheduleParser.parse(parser));
break;
default:
XContentParserUtils.throwUnknownToken(
parser.currentToken(), parser.getTokenLocation());
}
}
return jobParameter;
};
}

/**
* Returns the proper Instant object with milliseconds from the Unix epoch when the current
* token actually holds a value.
*
* @param parser: The parser as provided by JobScheduler
*/
private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

/**
* Close the resources opened by this plugin.
*
Expand Down
Loading

0 comments on commit 70443db

Please sign in to comment.