From 3cc9d9973937b3f864f914b4253f738f3b37d39b Mon Sep 17 00:00:00 2001 From: Mario Rodriguez Molins Date: Thu, 6 Jun 2024 17:13:24 +0200 Subject: [PATCH] Create Agent policies per each test execution (#1866) This PR adds a new Agent Policy that will be created and configured per each system test execution independently. Adding this new Agent Policy ensures that data streams are always empty at the beginning of the test and it avoids that two tests write into the same Data Stream. --- .../pipeline.trigger.integration.tests.sh | 1 - internal/agentdeployer/agent.go | 4 +- internal/servicedeployer/compose.go | 4 +- internal/servicedeployer/custom_agent.go | 2 +- internal/testrunner/runners/system/runner.go | 225 +++++++++--------- 5 files changed, 111 insertions(+), 125 deletions(-) diff --git a/.buildkite/pipeline.trigger.integration.tests.sh b/.buildkite/pipeline.trigger.integration.tests.sh index dc1e4d2ad..3e99ddf42 100755 --- a/.buildkite/pipeline.trigger.integration.tests.sh +++ b/.buildkite/pipeline.trigger.integration.tests.sh @@ -96,7 +96,6 @@ for package in $(find . -maxdepth 1 -mindepth 1 -type d) ; do label_suffix=" (independent agent)" fi package_name=$(basename "${package}") - if [[ "$independent_agent" == "false" && "$package_name" == "oracle" ]]; then echoerr "Package \"${package_name}\" skipped: not supported with Elastic Agent running in the stack (missing required software)." continue diff --git a/internal/agentdeployer/agent.go b/internal/agentdeployer/agent.go index 246348255..ce1ac8534 100644 --- a/internal/agentdeployer/agent.go +++ b/internal/agentdeployer/agent.go @@ -391,12 +391,12 @@ func (s *dockerComposeDeployedAgent) TearDown(ctx context.Context) error { defer func() { // Remove the service logs dir for this agent if err := os.RemoveAll(s.agentInfo.Logs.Folder.Local); err != nil { - logger.Errorf("could not remove the agent logs (path: %s): %w", s.agentInfo.Logs.Folder.Local, err) + logger.Errorf("could not remove the agent logs (path: %s): %v", s.agentInfo.Logs.Folder.Local, err) } // Remove the configuration dir for this agent (e.g. compose scenario files) if err := os.RemoveAll(s.agentInfo.ConfigDir); err != nil { - logger.Errorf("could not remove the agent configuration directory (path: %s) %w", s.agentInfo.ConfigDir, err) + logger.Errorf("could not remove the agent configuration directory (path: %s) %v", s.agentInfo.ConfigDir, err) } }() diff --git a/internal/servicedeployer/compose.go b/internal/servicedeployer/compose.go index 9dc274b36..989e15d81 100644 --- a/internal/servicedeployer/compose.go +++ b/internal/servicedeployer/compose.go @@ -98,7 +98,7 @@ func (d *DockerComposeServiceDeployer) SetUp(ctx context.Context, svcInfo Servic // service logs folder must no be deleted to avoid breaking log files written // by the service. If this is required, those files should be rotated or truncated // so the service can still write to them. - logger.Debug("Skipping removing service logs folder folder %s", svcInfo.Logs.Folder.Local) + logger.Debugf("Skipping removing service logs folder folder %s", svcInfo.Logs.Folder.Local) } else { err = files.RemoveContent(svcInfo.Logs.Folder.Local) if err != nil { @@ -242,7 +242,7 @@ func (s *dockerComposeDeployedService) TearDown(ctx context.Context) error { } // Remove the outputs generated by the service container if err = os.RemoveAll(s.svcInfo.OutputDir); err != nil { - logger.Errorf("could not remove the temporary output files %w", err) + logger.Errorf("could not remove the temporary output files %s", err) } }() diff --git a/internal/servicedeployer/custom_agent.go b/internal/servicedeployer/custom_agent.go index 36cc111e8..2c8e01ab8 100644 --- a/internal/servicedeployer/custom_agent.go +++ b/internal/servicedeployer/custom_agent.go @@ -129,7 +129,7 @@ func (d *CustomAgentDeployer) SetUp(ctx context.Context, svcInfo ServiceInfo) (D // service logs folder must no be deleted to avoid breaking log files written // by the service. If this is required, those files should be rotated or truncated // so the service can still write to them. - logger.Debug("Skipping removing service logs folder folder %s", svcInfo.Logs.Folder.Local) + logger.Debugf("Skipping removing service logs folder folder %s", svcInfo.Logs.Folder.Local) } else { err = files.RemoveContent(svcInfo.Logs.Folder.Local) if err != nil { diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index 33bee6ffe..3ba839b73 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -161,11 +161,11 @@ type runner struct { // Execution order of following handlers is defined in runner.TearDown() method. removeAgentHandler func(context.Context) error deleteTestPolicyHandler func(context.Context) error + cleanTestScenarioHandler func(context.Context) error resetAgentPolicyHandler func(context.Context) error resetAgentLogLevelHandler func(context.Context) error shutdownServiceHandler func(context.Context) error shutdownAgentHandler func(context.Context) error - wipeDataStreamHandler func(context.Context) error } type SystemRunnerOptions struct { @@ -315,7 +315,13 @@ func (r *runner) Run(ctx context.Context) ([]testrunner.TestResult, error) { if err != nil { return result.WithError(fmt.Errorf("failed to prepare scenario: %w", err)) } - return r.validateTestScenario(ctx, result, scenario, testConfig) + results, err := r.validateTestScenario(ctx, result, scenario, testConfig) + tdErr := r.tearDownTest(ctx) + if tdErr != nil { + logger.Errorf("failed to tear down runner: %s", tdErr.Error()) + } + return results, err + } if r.runTearDown { @@ -456,6 +462,9 @@ func (r *runner) tearDownTest(ctx context.Context) error { // Avoid cancellations during cleanup. cleanupCtx := context.WithoutCancel(ctx) + // This handler should be run before shutting down Elastic Agents (agent deployer) + // or services that could run agents like Custom Agents (service deployer) + // or Kind deployer. if r.resetAgentPolicyHandler != nil { if err := r.resetAgentPolicyHandler(cleanupCtx); err != nil { return err @@ -463,6 +472,23 @@ func (r *runner) tearDownTest(ctx context.Context) error { r.resetAgentPolicyHandler = nil } + // Shutting down the service should be run one of the first actions + // to ensure that resources created by terraform are deleted even if other + // errors fail. + if r.shutdownServiceHandler != nil { + if err := r.shutdownServiceHandler(cleanupCtx); err != nil { + return err + } + r.shutdownServiceHandler = nil + } + + if r.cleanTestScenarioHandler != nil { + if err := r.cleanTestScenarioHandler(cleanupCtx); err != nil { + return err + } + r.cleanTestScenarioHandler = nil + } + if r.resetAgentLogLevelHandler != nil { if err := r.resetAgentLogLevelHandler(cleanupCtx); err != nil { return err @@ -493,13 +519,6 @@ func (r *runner) tearDownTest(ctx context.Context) error { return err } - if r.shutdownServiceHandler != nil { - if err := r.shutdownServiceHandler(cleanupCtx); err != nil { - return err - } - r.shutdownServiceHandler = nil - } - if r.shutdownAgentHandler != nil { if err := r.shutdownAgentHandler(cleanupCtx); err != nil { return err @@ -507,13 +526,6 @@ func (r *runner) tearDownTest(ctx context.Context) error { r.shutdownAgentHandler = nil } - if r.wipeDataStreamHandler != nil { - if err := r.wipeDataStreamHandler(cleanupCtx); err != nil { - return err - } - r.wipeDataStreamHandler = nil - } - return nil } @@ -852,6 +864,20 @@ type scenarioTest struct { startTestTime time.Time } +func (r *runner) deleteDataStream(ctx context.Context, dataStream string) error { + resp, err := r.esAPI.Indices.DeleteDataStream([]string{dataStream}, + r.esAPI.Indices.DeleteDataStream.WithContext(ctx), + ) + if err != nil { + return fmt.Errorf("failed to delete data stream %s: %w", dataStream, err) + } + defer resp.Body.Close() + if resp.IsError() { + return fmt.Errorf("could not get delete data stream %s: %s", dataStream, resp.String()) + } + return nil +} + func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInfo servicedeployer.ServiceInfo) (*scenarioTest, error) { serviceOptions := r.createServiceOptions(config.ServiceVariantName) @@ -911,47 +937,64 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf } // Configure package (single data stream) via Fleet APIs. - var policyToTest, policyToEnroll *kibana.Policy + testTime := time.Now().Format("20060102T15:04:05Z") + var policyToTest, policyCurrent, policyToEnroll *kibana.Policy if r.runTearDown || r.runTestsOnly { - policyToTest = &serviceStateData.CurrentPolicy + policyCurrent = &serviceStateData.CurrentPolicy policyToEnroll = &serviceStateData.EnrollPolicy - logger.Debugf("Got policy from file: %q - %q", policyToTest.Name, policyToTest.ID) + logger.Debugf("Got current policy from file: %q - %q", policyCurrent.Name, policyCurrent.ID) } else { - // Create two different policies, one for enrolling the agent and the other for testing. - // This allows us to ensure that the Agent Policy used for testing is - // assigned to the agent with all the required changes (e.g. Package DataStream) - logger.Debug("creating test policies...") - testTime := time.Now().Format("20060102T15:04:05Z") - + // Created a specific Agent Policy to enrolling purposes + // There are some issues when the stack is running for some time, + // agents cannot enroll with the default policy + // This enroll policy must be created even if independent Elastic Agents are not used. Agents created + // in Kubernetes or Custom Agents require this enroll policy too (service deployer). + logger.Debug("creating enroll policy...") policyEnroll := kibana.Policy{ Name: fmt.Sprintf("ep-test-system-enroll-%s-%s-%s", r.testFolder.Package, r.testFolder.DataStream, testTime), Description: fmt.Sprintf("test policy created by elastic-package to enroll agent for data stream %s/%s", r.testFolder.Package, r.testFolder.DataStream), - Namespace: "ep", + Namespace: common.CreateTestRunID(), } - policyTest := kibana.Policy{ + policyToEnroll, err = r.kibanaClient.CreatePolicy(ctx, policyEnroll) + if err != nil { + return nil, fmt.Errorf("could not create test policy: %w", err) + } + } + + if r.runTearDown { + // required to assign the policy stored in the service state file + // so data stream related to this Agent Policy can be obtained (and deleted) + // in the cleanTestScenarioHandler handler + policyToTest = policyCurrent + } else { + // Create a specific Agent Policy just for testing this test. + // This allows us to ensure that the Agent Policy used for testing is + // assigned to the agent with all the required changes (e.g. Package DataStream) + logger.Debug("creating test policy...") + policy := kibana.Policy{ Name: fmt.Sprintf("ep-test-system-%s-%s-%s", r.testFolder.Package, r.testFolder.DataStream, testTime), Description: fmt.Sprintf("test policy created by elastic-package test system for data stream %s/%s", r.testFolder.Package, r.testFolder.DataStream), - Namespace: "ep", + Namespace: common.CreateTestRunID(), } // Assign the data_output_id to the agent policy to configure the output to logstash. The value is inferred from stack/_static/kibana.yml.tmpl if r.profile.Config("stack.logstash_enabled", "false") == "true" { - policyTest.DataOutputID = "fleet-logstash-output" - } - policyToTest, err = r.kibanaClient.CreatePolicy(ctx, policyTest) - if err != nil { - return nil, fmt.Errorf("could not create test policy: %w", err) + policy.DataOutputID = "fleet-logstash-output" } - policyToEnroll, err = r.kibanaClient.CreatePolicy(ctx, policyEnroll) + policyToTest, err = r.kibanaClient.CreatePolicy(ctx, policy) if err != nil { return nil, fmt.Errorf("could not create test policy: %w", err) } } + r.deleteTestPolicyHandler = func(ctx context.Context) error { logger.Debug("deleting test policies...") if err := r.kibanaClient.DeletePolicy(ctx, policyToTest.ID); err != nil { return fmt.Errorf("error cleaning up test policy: %w", err) } + if r.runTestsOnly { + return nil + } if err := r.kibanaClient.DeletePolicy(ctx, policyToEnroll.ID); err != nil { return fmt.Errorf("error cleaning up test policy: %w", err) } @@ -963,7 +1006,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf if r.runTearDown || r.runTestsOnly { // required in order to be able select the right agent in `checkEnrolledAgents` when // using independent agents or custom/kubernetes agents since policy data is set into `agentInfo` variable` - policy = policyToTest + policy = policyCurrent } agentDeployed, agentInfo, err := r.setupAgent(ctx, config, serviceStateData, policy, scenario.pkgManifest.Agent) @@ -1007,8 +1050,8 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf scenario.startTestTime = time.Now() logger.Debug("adding package data stream to test policy...") - ds := createPackageDatastream(*policyToTest, *scenario.pkgManifest, policyTemplate, *scenario.dataStreamManifest, *config, svcInfo.Test.RunID) - if r.runTearDown || r.runTestsOnly { + ds := createPackageDatastream(*policyToTest, *scenario.pkgManifest, policyTemplate, *scenario.dataStreamManifest, *config, policyToTest.Namespace) + if r.runTearDown { logger.Debug("Skip adding data stream config to policy") } else { if err := r.kibanaClient.AddPackageDataStreamToPolicy(ctx, ds); err != nil { @@ -1040,29 +1083,13 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf dataStreamDataset, ) - r.wipeDataStreamHandler = func(ctx context.Context) error { - logger.Debugf("deleting data in data stream...") - if err := deleteDataStreamDocs(ctx, r.esAPI, scenario.dataStream); err != nil { - return fmt.Errorf("error deleting data in data stream: %w", err) - } - return nil - } - - switch { - case r.runTearDown: - logger.Debugf("Skipped deleting old data in data stream %q", scenario.dataStream) - case r.runTestsOnly: - // In this mode, service is still running and the agent is sending documents, so sometimes - // cannot be guaranteed to be zero documents - err := r.deleteOldDocumentsDataStreamAndWait(ctx, scenario.dataStream, false) + r.cleanTestScenarioHandler = func(ctx context.Context) error { + logger.Debugf("Deleting data stream for testing %s", scenario.dataStream) + r.deleteDataStream(ctx, scenario.dataStream) if err != nil { - return nil, err - } - default: - err := r.deleteOldDocumentsDataStreamAndWait(ctx, scenario.dataStream, true) - if err != nil { - return nil, err + return fmt.Errorf("failed to delete data stream %s: %w", scenario.dataStream, err) } + return nil } // FIXME: running per stages does not work when multiple agents are created @@ -1075,6 +1102,9 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf logger.Debugf("Selected enrolled agent %q", agent.ID) r.removeAgentHandler = func(ctx context.Context) error { + if r.runTestsOnly { + return nil + } // When not using independent agents, service deployers like kubernetes or custom agents create new Elastic Agent if !r.runIndependentElasticAgent && !svcInfo.Agent.Independent { return nil @@ -1087,7 +1117,7 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf return nil } - if r.runTearDown || r.runTestsOnly { + if r.runTearDown { origPolicy = serviceStateData.OrigPolicy logger.Debugf("Got orig policy from file: %q - %q", origPolicy.Name, origPolicy.ID) } else { @@ -1097,9 +1127,11 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf Revision: agent.PolicyRevision, } } - // Assign policy to agent + r.resetAgentPolicyHandler = func(ctx context.Context) error { - if r.runIndependentElasticAgent { + if r.runSetup { + // it should be kept the same policy just when system tests are + // triggered with the flags for running spolicyToAssignDatastreamTestsetup stage (--setup) return nil } logger.Debug("reassigning original policy back to agent...") @@ -1123,6 +1155,9 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf } } r.resetAgentLogLevelHandler = func(ctx context.Context) error { + if r.runTestsOnly { + return nil + } logger.Debugf("reassigning original log level %q back to agent...", origLogLevel) if err := r.kibanaClient.SetAgentLogLevel(ctx, agent.ID, origLogLevel); err != nil { @@ -1131,8 +1166,8 @@ func (r *runner) prepareScenario(ctx context.Context, config *testConfig, svcInf return nil } - if r.runTearDown || r.runTestsOnly { - logger.Debug("Skip assiging package data stream to agent") + if r.runTearDown { + logger.Debug("Skip assigning package data stream to agent") } else { policyWithDataStream, err := r.kibanaClient.GetPolicy(ctx, policyToTest.ID) if err != nil { @@ -1275,6 +1310,9 @@ func (r *runner) setupService(ctx context.Context, config *testConfig, serviceOp } r.shutdownServiceHandler = func(ctx context.Context) error { + if r.runTestsOnly { + return nil + } logger.Debug("tearing down service...") if err := service.TearDown(ctx); err != nil { return fmt.Errorf("error tearing down service: %w", err) @@ -1314,6 +1352,9 @@ func (r *runner) setupAgent(ctx context.Context, config *testConfig, state Servi return nil, agentInfo, fmt.Errorf("could not setup agent: %w", err) } r.shutdownAgentHandler = func(ctx context.Context) error { + if r.runTestsOnly { + return nil + } if agentDeployer == nil { return nil } @@ -1404,39 +1445,6 @@ func (r *runner) writeScenarioState(opts scenarioStateOpts) error { return nil } -func (r *runner) deleteOldDocumentsDataStreamAndWait(ctx context.Context, dataStream string, mustBeZero bool) error { - logger.Debugf("Delete previous documents in data stream %q", dataStream) - if err := deleteDataStreamDocs(ctx, r.esAPI, dataStream); err != nil { - return fmt.Errorf("error deleting old data in data stream: %s: %w", dataStream, err) - } - startHits, err := r.getDocs(ctx, dataStream) - if err != nil { - return err - } - // First call already reports zero documents - if startHits.size() == 0 { - return nil - } - cleared, err := wait.UntilTrue(ctx, func(ctx context.Context) (bool, error) { - hits, err := r.getDocs(ctx, dataStream) - if err != nil { - return false, err - } - - if mustBeZero { - return hits.size() == 0, nil - } - return startHits.size() > hits.size(), nil - }, 1*time.Second, 2*time.Minute) - if err != nil || !cleared { - if err == nil { - err = errors.New("unable to clear previous data") - } - return err - } - return nil -} - func (r *runner) validateTestScenario(ctx context.Context, result *testrunner.ResultComposer, scenario *scenarioTest, config *testConfig) ([]testrunner.TestResult, error) { // Validate fields in docs // when reroute processors are used, expectedDatasets should be set depends on the processor config @@ -1610,7 +1618,7 @@ func createIntegrationPackageDatastream( ) kibana.PackageDataStream { r := kibana.PackageDataStream{ Name: fmt.Sprintf("%s-%s-%s", pkg.Name, ds.Name, suffix), - Namespace: "ep", + Namespace: kibanaPolicy.Namespace, PolicyID: kibanaPolicy.ID, Enabled: true, Inputs: []kibana.Input{ @@ -1664,7 +1672,7 @@ func createInputPackageDatastream( ) kibana.PackageDataStream { r := kibana.PackageDataStream{ Name: fmt.Sprintf("%s-%s-%s", pkg.Name, policyTemplate.Name, suffix), - Namespace: "ep", + Namespace: kibanaPolicy.Namespace, PolicyID: kibanaPolicy.ID, Enabled: true, } @@ -1961,27 +1969,6 @@ func (r *runner) previewTransform(ctx context.Context, transformId string) ([]co return preview.Documents, nil } -func deleteDataStreamDocs(ctx context.Context, api *elasticsearch.API, dataStream string) error { - body := strings.NewReader(`{ "query": { "match_all": {} } }`) - resp, err := api.DeleteByQuery([]string{dataStream}, body, - api.DeleteByQuery.WithContext(ctx), - ) - if err != nil { - return fmt.Errorf("failed to delete data stream docs: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode == http.StatusNotFound { - // Unavailable index is ok, this means that data is already not there. - return nil - } - if resp.IsError() { - return fmt.Errorf("failed to delete data stream docs for data stream %s: %s", dataStream, resp.String()) - } - - return nil -} - func filterAgents(allAgents []kibana.Agent, svcInfo servicedeployer.ServiceInfo) []kibana.Agent { if svcInfo.Agent.Host.NamePrefix != "" { logger.Debugf("filter agents using criteria: NamePrefix=%s", svcInfo.Agent.Host.NamePrefix)