diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 72264b8eb..ab09c381f 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -1,6 +1,7 @@ env: SETUP_GVM_VERSION: 'v0.5.2' # https://github.com/andrewkroh/gvm/issues/44#issuecomment-1013231151 ELASTIC_PACKAGE_COMPOSE_DISABLE_VERBOSE_OUTPUT: "true" + ELASTIC_PACKAGE_MAXIMUM_NUMBER_PARALLEL_TESTS: 3 DOCKER_COMPOSE_VERSION: "v2.24.1" DOCKER_VERSION: "26.1.2" KIND_VERSION: 'v0.20.0' diff --git a/cmd/testrunner.go b/cmd/testrunner.go index b9b0ff2ec..1eb1b3aaf 100644 --- a/cmd/testrunner.go +++ b/cmd/testrunner.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "slices" + "sort" "strings" "github.com/spf13/cobra" @@ -165,9 +166,15 @@ func testRunnerAssetCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("can't create Kibana client: %w", err) } + globalTestConfig, err := testrunner.ReadGlobalTestConfig(packageRootPath) + if err != nil { + return fmt.Errorf("failed to read global config: %w", err) + } + runner := asset.NewAssetTestRunner(asset.AssetTestRunnerOptions{ - PackageRootPath: packageRootPath, - KibanaClient: kibanaClient, + PackageRootPath: packageRootPath, + KibanaClient: kibanaClient, + GlobalTestConfig: globalTestConfig.Asset, }) results, err := testrunner.RunSuite(ctx, runner) @@ -247,10 +254,16 @@ func testRunnerStaticCommandAction(cmd *cobra.Command, args []string) error { ctx, stop := signal.Enable(cmd.Context(), logger.Info) defer stop() + globalTestConfig, err := testrunner.ReadGlobalTestConfig(packageRootPath) + if err != nil { + return fmt.Errorf("failed to read global config: %w", err) + } + runner := static.NewStaticTestRunner(static.StaticTestRunnerOptions{ PackageRootPath: packageRootPath, DataStreams: dataStreams, FailOnMissingTests: failOnMissing, + GlobalTestConfig: globalTestConfig.Static, }) results, err := testrunner.RunSuite(ctx, runner) @@ -355,6 +368,11 @@ func testRunnerPipelineCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("reading package manifest failed (path: %s): %w", packageRootPath, err) } + globalTestConfig, err := testrunner.ReadGlobalTestConfig(packageRootPath) + if err != nil { + return fmt.Errorf("failed to read global config: %w", err) + } + runner := pipeline.NewPipelineTestRunner(pipeline.PipelineTestRunnerOptions{ Profile: profile, PackageRootPath: packageRootPath, @@ -365,6 +383,7 @@ func testRunnerPipelineCommandAction(cmd *cobra.Command, args []string) error { WithCoverage: testCoverage, CoverageType: testCoverageFormat, DeferCleanup: deferCleanup, + GlobalTestConfig: globalTestConfig.Pipeline, }) results, err := testrunner.RunSuite(ctx, runner) @@ -532,6 +551,11 @@ func testRunnerSystemCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("reading package manifest failed (path: %s): %w", packageRootPath, err) } + globalTestConfig, err := testrunner.ReadGlobalTestConfig(packageRootPath) + if err != nil { + return fmt.Errorf("failed to read global config: %w", err) + } + runner := system.NewSystemTestRunner(system.SystemTestRunnerOptions{ Profile: profile, PackageRootPath: packageRootPath, @@ -547,6 +571,7 @@ func testRunnerSystemCommandAction(cmd *cobra.Command, args []string) error { GenerateTestResult: generateTestResult, DeferCleanup: deferCleanup, RunIndependentElasticAgent: false, + GlobalTestConfig: globalTestConfig.System, }) logger.Debugf("Running suite...") @@ -646,12 +671,18 @@ func testRunnerPolicyCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("reading package manifest failed (path: %s): %w", packageRootPath, err) } + globalTestConfig, err := testrunner.ReadGlobalTestConfig(packageRootPath) + if err != nil { + return fmt.Errorf("failed to read global config: %w", err) + } + runner := policy.NewPolicyTestRunner(policy.PolicyTestRunnerOptions{ PackageRootPath: packageRootPath, KibanaClient: kibanaClient, DataStreams: dataStreams, FailOnMissingTests: failOnMissing, GenerateTestResult: generateTestResult, + GlobalTestConfig: globalTestConfig.Policy, }) results, err := testrunner.RunSuite(ctx, runner) @@ -663,6 +694,18 @@ func testRunnerPolicyCommandAction(cmd *cobra.Command, args []string) error { } func processResults(results []testrunner.TestResult, testType testrunner.TestType, reportFormat, reportOutput, packageRootPath, packageName, packageType, testCoverageFormat string, testCoverage bool) error { + sort.Slice(results, func(i, j int) bool { + if results[i].Package != results[j].Package { + return results[i].Package < results[j].Package + } + if results[i].TestType != results[j].TestType { + return results[i].TestType < results[j].TestType + } + if results[i].DataStream != results[j].DataStream { + return results[i].DataStream < results[j].DataStream + } + return results[i].Name < results[j].Name + }) format := testrunner.TestReportFormat(reportFormat) report, err := testrunner.FormatReport(format, results) if err != nil { diff --git a/docs/howto/asset_testing.md b/docs/howto/asset_testing.md index 45dc373ba..f979c45d6 100644 --- a/docs/howto/asset_testing.md +++ b/docs/howto/asset_testing.md @@ -49,3 +49,14 @@ Finally, when you are done running all asset loading tests, bring down the Elast ``` elastic-package stack down ``` + +## Global test configuration + +Each package could define a configuration file in `_dev/test/config.yml` to skip all the asset tests. + +```yaml +asset: + skip: + reason: + link: +``` \ No newline at end of file diff --git a/docs/howto/pipeline_testing.md b/docs/howto/pipeline_testing.md index 7a5517ef0..a59b21ce0 100644 --- a/docs/howto/pipeline_testing.md +++ b/docs/howto/pipeline_testing.md @@ -175,3 +175,14 @@ Finally, when you are done running all pipeline tests, bring down the Elastic St ``` elastic-package stack down ``` + +## Global test configuration + +Each package could define a configuration file in `_dev/test/config.yml` to skip all the pipeline tests. + +```yaml +pipeline: + skip: + reason: + link: +``` \ No newline at end of file diff --git a/docs/howto/policy_testing.md b/docs/howto/policy_testing.md index de16dd8c6..a0d7bcab1 100644 --- a/docs/howto/policy_testing.md +++ b/docs/howto/policy_testing.md @@ -41,6 +41,17 @@ It is possible, and encouraged, to define multiple policy tests for each package or data stream. +## Global test configuration + +Each package could define a configuration file in `_dev/test/config.yml` to skip all the policy tests. + +```yaml +policy: + skip: + reason: + link: +``` + ### Defining the configuration of the policy Test configuration for the policy is defined in a YAML file prefixed with diff --git a/docs/howto/static_testing.md b/docs/howto/static_testing.md index 185974c90..f479deb6c 100644 --- a/docs/howto/static_testing.md +++ b/docs/howto/static_testing.md @@ -26,3 +26,14 @@ If you want to run pipeline tests for **specific data streams** in a package, na ``` elastic-package test static --data-streams [,,...] ``` + +## Global test configuration + +Each package could define a configuration file in `_dev/test/config.yml` to skip all the static tests. + +```yaml +static: + skip: + reason: + link: +``` \ No newline at end of file diff --git a/docs/howto/system_testing.md b/docs/howto/system_testing.md index 07ec66d5a..b804f4e76 100644 --- a/docs/howto/system_testing.md +++ b/docs/howto/system_testing.md @@ -553,6 +553,20 @@ Placeholders used in the `test--config.yml` must be enclosed in `{{{` **NOTE**: Terraform variables in the form of environment variables (prefixed with `TF_VAR_`) are not injected and cannot be used as placeholder (their value will always be empty). +## Global test configuration + +Each package could define a configuration file in `_dev/test/config.yml` that allows to: +- skip all the system tests defined. +- set if these system tests should be running in parallel or not. + +```yaml +system: + parallel: true + skip: + reason: + link: +``` + ## Running a system test Once the two levels of configurations are defined as described in the previous section, you are ready to run system tests for a package's data streams. @@ -761,11 +775,36 @@ Considerations for this mode of running Elastic Agents: - Create a new `_dev/deploy/docker` adding the service container if needed. - Define the settings required for your Elastic Agents in all the test configuration files. +#### Running system tests in parallel (technical preview) + +By default, `elatic-package` runs every system test defined in the package sequentially. +This could be changed to allow running in parallel tests. For that it is needed: +- running tests using independent Elastic Agents (see [section](#running-system-tests-with-independent-elastic-agents-in-each-test-technical-preview)). +- package must define the global test configuration file with these contents to enable system test parallelization: + ```yaml + system: + parallel: true + ``` +- define how many tests in parallel should be running + - This is done defining the environment variable `ELASTIC_PACKAGE_MAXIMUM_NUMBER_PARALLEL_TESTS` + + +Given those requirements, this is an example to run system tests in parallel: +```shell +ELASTIC_PACKAGE_MAXIMUM_NUMBER_PARALLEL_TESTS=5 \ + ELASTIC_PACKAGE_TEST_ENABLE_INDEPENDENT_AGENT=true \ + elastic-package test system -v +``` + +**NOTE**: +- Currently, just system tests support to run tests in parallel. +- **Not recommended** to enable system tests in parallel for packages that make use of the Terraform or Kubernetes service deployers. + ### Detecting ignored fields As part of the system test, `elastic-package` checks whether any documents couldn't successfully map any fields. Common issues are the configured field limit being exceeded or keyword fields receiving values longer than `ignore_above`. You can learn more in the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-ignored-field.html). -In this case, `elastic-package test system` will fail with an error and print a sample of affected documents. To fix the issue, check which fields got ignored and the `ignored_field_values` and either adapt the mapping or the ingest pipeline to accomodate for the problematic values. In case an ignored field can't be meaningfully mitigated, it's possible to skip the check by listing the field under the `skip_ignored_fields` property in the system test config of the data stream: +In this case, `elastic-package test system` will fail with an error and print a sample of affected documents. To fix the issue, check which fields got ignored and the `ignored_field_values` and either adapt the mapping or the ingest pipeline to accommodate for the problematic values. In case an ignored field can't be meaningfully mitigated, it's possible to skip the check by listing the field under the `skip_ignored_fields` property in the system test config of the data stream: ``` # data_stream//_dev/test/system/test-default-config.yml skip_ignored_fields: diff --git a/internal/compose/compose.go b/internal/compose/compose.go index 0f9b0c21e..cf8f8a24a 100644 --- a/internal/compose/compose.go +++ b/internal/compose/compose.go @@ -370,29 +370,33 @@ func (p *Project) WaitForHealthy(ctx context.Context, opts CommandOptions) error } for _, containerDescription := range descriptions { - logger.Debugf("Container status: %s", containerDescription.String()) // No healthcheck defined for service if containerDescription.State.Status == "running" && containerDescription.State.Health == nil { + logger.Debugf("Container %s status: %s (no health status)", containerDescription.ID, containerDescription.State.Status) continue } // Service is up and running and it's healthy if containerDescription.State.Status == "running" && containerDescription.State.Health.Status == "healthy" { + logger.Debugf("Container %s status: %s (health: %s)", containerDescription.ID, containerDescription.State.Status, containerDescription.State.Health.Status) continue } // Container started and finished with exit code 0 if containerDescription.State.Status == "exited" && containerDescription.State.ExitCode == 0 { + logger.Debugf("Container %s status: %s (exit code: %d)", containerDescription.ID, containerDescription.State.Status, containerDescription.State.ExitCode) continue } // Container exited with code > 0 if containerDescription.State.Status == "exited" && containerDescription.State.ExitCode > 0 { + logger.Debugf("Container %s status: %s (exit code: %d)", containerDescription.ID, containerDescription.State.Status, containerDescription.State.ExitCode) return fmt.Errorf("container (ID: %s) exited with code %d", containerDescription.ID, containerDescription.State.ExitCode) } // Any different status is considered unhealthy + logger.Debugf("Container %s status: unhealthy", containerDescription.ID) healthy = false } diff --git a/internal/kibana/agents.go b/internal/kibana/agents.go index 5014a852f..041d27e16 100644 --- a/internal/kibana/agents.go +++ b/internal/kibana/agents.go @@ -118,7 +118,8 @@ func (c *Client) waitUntilPolicyAssigned(ctx context.Context, a Agent, p Policy) if err != nil { return fmt.Errorf("can't get the agent: %w", err) } - logger.Debugf("Agent data: %s", agent.String()) + logger.Debugf("Agent %s (Host: %s): Policy ID %s LogLevel: %s Status: %s", + agent.ID, agent.LocalMetadata.Host.Name, agent.PolicyID, agent.LocalMetadata.Elastic.Agent.LogLevel, agent.Status) if agent.PolicyID == p.ID && agent.PolicyRevision >= p.Revision { logger.Debugf("Policy revision assigned to the agent (ID: %s)...", a.ID) diff --git a/internal/testrunner/globaltestconfig.go b/internal/testrunner/globaltestconfig.go new file mode 100644 index 000000000..d5539e287 --- /dev/null +++ b/internal/testrunner/globaltestconfig.go @@ -0,0 +1,51 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package testrunner + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/elastic/go-ucfg" + "github.com/elastic/go-ucfg/yaml" +) + +type globalTestConfig struct { + Asset GlobalRunnerTestConfig `config:"asset"` + Pipeline GlobalRunnerTestConfig `config:"pipeline"` + Policy GlobalRunnerTestConfig `config:"policy"` + Static GlobalRunnerTestConfig `config:"static"` + System GlobalRunnerTestConfig `config:"system"` +} + +type GlobalRunnerTestConfig struct { + Parallel bool `config:"parallel"` + SkippableConfig `config:",inline"` +} + +func ReadGlobalTestConfig(packageRootPath string) (*globalTestConfig, error) { + configFilePath := filepath.Join(packageRootPath, "_dev", "test", "config.yml") + + data, err := os.ReadFile(configFilePath) + if errors.Is(err, os.ErrNotExist) { + return &globalTestConfig{}, nil + } + if err != nil { + return nil, fmt.Errorf("failed to read %s: %w", configFilePath, err) + } + + var c globalTestConfig + cfg, err := yaml.NewConfig(data, ucfg.PathSep(".")) + if err != nil { + return nil, fmt.Errorf("unable to load global test configuration file: %s: %w", configFilePath, err) + } + if err := cfg.Unpack(&c); err != nil { + return nil, fmt.Errorf("unable to unpack global test configuration file: %s: %w", configFilePath, err) + } + + return &c, nil +} diff --git a/internal/testrunner/runners/asset/runner.go b/internal/testrunner/runners/asset/runner.go index 4fecc2056..ddf1e98c4 100644 --- a/internal/testrunner/runners/asset/runner.go +++ b/internal/testrunner/runners/asset/runner.go @@ -17,19 +17,22 @@ const ( ) type runner struct { - packageRootPath string - kibanaClient *kibana.Client + packageRootPath string + kibanaClient *kibana.Client + globalTestConfig testrunner.GlobalRunnerTestConfig } type AssetTestRunnerOptions struct { - PackageRootPath string - KibanaClient *kibana.Client + PackageRootPath string + KibanaClient *kibana.Client + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewAssetTestRunner(options AssetTestRunnerOptions) *runner { runner := runner{ - packageRootPath: options.PackageRootPath, - kibanaClient: options.KibanaClient, + packageRootPath: options.PackageRootPath, + kibanaClient: options.KibanaClient, + globalTestConfig: options.GlobalTestConfig, } return &runner } @@ -53,9 +56,10 @@ func (r *runner) TearDownRunner(ctx context.Context) error { func (r *runner) GetTests(ctx context.Context) ([]testrunner.Tester, error) { testers := []testrunner.Tester{ NewAssetTester(AssetTesterOptions{ - PackageRootPath: r.packageRootPath, - KibanaClient: r.kibanaClient, - TestFolder: testrunner.TestFolder{Package: r.packageRootPath}, + PackageRootPath: r.packageRootPath, + KibanaClient: r.kibanaClient, + TestFolder: testrunner.TestFolder{Package: r.packageRootPath}, + GlobalTestConfig: r.globalTestConfig, }), } return testers, nil diff --git a/internal/testrunner/runners/asset/tester.go b/internal/testrunner/runners/asset/tester.go index 775904620..b1aa14e71 100644 --- a/internal/testrunner/runners/asset/tester.go +++ b/internal/testrunner/runners/asset/tester.go @@ -22,19 +22,22 @@ type tester struct { packageRootPath string kibanaClient *kibana.Client resourcesManager *resources.Manager + globalTestConfig testrunner.GlobalRunnerTestConfig } type AssetTesterOptions struct { - TestFolder testrunner.TestFolder - PackageRootPath string - KibanaClient *kibana.Client + TestFolder testrunner.TestFolder + PackageRootPath string + KibanaClient *kibana.Client + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewAssetTester(options AssetTesterOptions) *tester { tester := tester{ - testFolder: options.TestFolder, - packageRootPath: options.PackageRootPath, - kibanaClient: options.KibanaClient, + testFolder: options.TestFolder, + packageRootPath: options.PackageRootPath, + kibanaClient: options.KibanaClient, + globalTestConfig: options.GlobalTestConfig, } manager := resources.NewManager() @@ -57,6 +60,12 @@ func (r tester) String() string { return "asset loading" } +// Parallel indicates if this tester can run in parallel or not. +func (r tester) Parallel() bool { + // Not supported yet parallel tests even if it is indicated in the global config r.globalTestConfig + return false +} + // Run runs the asset loading tests func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { return r.run(ctx) @@ -87,11 +96,16 @@ func (r *tester) run(ctx context.Context) ([]testrunner.TestResult, error) { return result.WithError(fmt.Errorf("unable to load asset loading test config file: %w", err)) } - if testConfig != nil && testConfig.Skip != nil { - logger.Warnf("skipping %s test for %s: %s (details: %s)", - TestType, r.testFolder.Package, - testConfig.Skip.Reason, testConfig.Skip.Link) - return result.WithSkip(testConfig.Skip) + skipConfigs := []*testrunner.SkipConfig{r.globalTestConfig.Skip} + if testConfig != nil { + skipConfigs = append(skipConfigs, testConfig.Skip) + } + + if skip := testrunner.AnySkipConfig(skipConfigs...); skip != nil { + logger.Warnf("skipping %s test for %s/%s: %s (details: %s)", + TestType, r.testFolder.Package, r.testFolder.DataStream, + skip.Reason, skip.Link) + return result.WithSkip(skip) } logger.Debug("installing package...") diff --git a/internal/testrunner/runners/pipeline/runner.go b/internal/testrunner/runners/pipeline/runner.go index e6eeb3ae1..01de91c64 100644 --- a/internal/testrunner/runners/pipeline/runner.go +++ b/internal/testrunner/runners/pipeline/runner.go @@ -31,9 +31,10 @@ type runner struct { failOnMissingTests bool generateTestResult bool - withCoverage bool - coverageType string - deferCleanup time.Duration + withCoverage bool + coverageType string + deferCleanup time.Duration + globalTestConfig testrunner.GlobalRunnerTestConfig } type PipelineTestRunnerOptions struct { @@ -46,6 +47,7 @@ type PipelineTestRunnerOptions struct { WithCoverage bool CoverageType string DeferCleanup time.Duration + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewPipelineTestRunner(options PipelineTestRunnerOptions) *runner { @@ -59,6 +61,7 @@ func NewPipelineTestRunner(options PipelineTestRunnerOptions) *runner { withCoverage: options.WithCoverage, coverageType: options.CoverageType, deferCleanup: options.DeferCleanup, + globalTestConfig: options.GlobalTestConfig, } return &runner } @@ -134,6 +137,7 @@ func (r *runner) GetTests(ctx context.Context) ([]testrunner.Tester, error) { Profile: r.profile, API: r.esAPI, TestCaseFile: caseFile, + GlobalTestConfig: r.globalTestConfig, }) if err != nil { return nil, fmt.Errorf("failed to create pipeline tester: %w", err) diff --git a/internal/testrunner/runners/pipeline/tester.go b/internal/testrunner/runners/pipeline/tester.go index 4fda95057..84e838f7a 100644 --- a/internal/testrunner/runners/pipeline/tester.go +++ b/internal/testrunner/runners/pipeline/tester.go @@ -45,6 +45,7 @@ type tester struct { generateTestResult bool withCoverage bool coverageType string + globalTestConfig testrunner.GlobalRunnerTestConfig testCaseFile string @@ -65,6 +66,7 @@ type PipelineTesterOptions struct { WithCoverage bool CoverageType string TestCaseFile string + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewPipelineTester(options PipelineTesterOptions) (*tester, error) { @@ -78,6 +80,7 @@ func NewPipelineTester(options PipelineTesterOptions) (*tester, error) { generateTestResult: options.GenerateTestResult, withCoverage: options.WithCoverage, coverageType: options.CoverageType, + globalTestConfig: options.GlobalTestConfig, } stackConfig, err := stack.LoadConfig(r.profile) @@ -127,6 +130,12 @@ func (r *tester) String() string { return "pipeline" } +// Parallel indicates if this tester can run in parallel or not. +func (r tester) Parallel() bool { + // Not supported yet parallel tests even if it is indicated in the global config r.globalTestConfig + return false +} + // Run runs the pipeline tests defined under the given folder func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { return r.run(ctx) @@ -307,12 +316,11 @@ func (r *tester) runTestCase(ctx context.Context, testCaseFile string, dsPath st } tr.Name = tc.name - if tc.config.Skip != nil { + if skip := testrunner.AnySkipConfig(tc.config.Skip, r.globalTestConfig.Skip); skip != nil { logger.Warnf("skipping %s test for %s/%s: %s (details: %s)", TestType, r.testFolder.Package, r.testFolder.DataStream, - tc.config.Skip.Reason, tc.config.Skip.Link) - - tr.Skipped = tc.config.Skip + skip.Reason, skip.Link) + tr.Skipped = skip return tr, nil } diff --git a/internal/testrunner/runners/policy/runner.go b/internal/testrunner/runners/policy/runner.go index 3d0cb9e8a..a563cca17 100644 --- a/internal/testrunner/runners/policy/runner.go +++ b/internal/testrunner/runners/policy/runner.go @@ -28,6 +28,7 @@ type runner struct { dataStreams []string failOnMissingTests bool generateTestResult bool + globalTestConfig testrunner.GlobalRunnerTestConfig resourcesManager *resources.Manager cleanup func(context.Context) error @@ -42,6 +43,7 @@ type PolicyTestRunnerOptions struct { DataStreams []string FailOnMissingTests bool GenerateTestResult bool + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewPolicyTestRunner(options PolicyTestRunnerOptions) *runner { @@ -51,6 +53,7 @@ func NewPolicyTestRunner(options PolicyTestRunnerOptions) *runner { dataStreams: options.DataStreams, failOnMissingTests: options.FailOnMissingTests, generateTestResult: options.GenerateTestResult, + globalTestConfig: options.GlobalTestConfig, } runner.resourcesManager = resources.NewManager() runner.resourcesManager.RegisterProvider(resources.DefaultKibanaProviderName, &resources.KibanaProvider{Client: runner.kibanaClient}) @@ -131,6 +134,7 @@ func (r *runner) GetTests(ctx context.Context) ([]testrunner.Tester, error) { KibanaClient: r.kibanaClient, GenerateTestResult: r.generateTestResult, TestPath: test, + GlobalTestConfig: r.globalTestConfig, })) } diff --git a/internal/testrunner/runners/policy/testconfig.go b/internal/testrunner/runners/policy/testconfig.go index 437d1559b..acae9a1a7 100644 --- a/internal/testrunner/runners/policy/testconfig.go +++ b/internal/testrunner/runners/policy/testconfig.go @@ -9,9 +9,13 @@ import ( "os" "gopkg.in/yaml.v3" + + "github.com/elastic/elastic-package/internal/testrunner" ) type testConfig struct { + testrunner.SkippableConfig `config:",inline"` + Input string `yaml:"input,omitempty"` Vars map[string]any `yaml:"vars,omitempty"` DataStream struct { diff --git a/internal/testrunner/runners/policy/tester.go b/internal/testrunner/runners/policy/tester.go index 670e994fa..e0c277443 100644 --- a/internal/testrunner/runners/policy/tester.go +++ b/internal/testrunner/runners/policy/tester.go @@ -18,11 +18,13 @@ import ( ) type tester struct { - testFolder testrunner.TestFolder - packageRootPath string + testFolder testrunner.TestFolder + packageRootPath string + kibanaClient *kibana.Client + testPath string + generateTestResult bool - kibanaClient *kibana.Client - testPath string + globalTestConfig testrunner.GlobalRunnerTestConfig resourcesManager *resources.Manager } @@ -36,6 +38,7 @@ type PolicyTesterOptions struct { KibanaClient *kibana.Client PackageRootPath string GenerateTestResult bool + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewPolicyTester(options PolicyTesterOptions) *tester { @@ -45,6 +48,7 @@ func NewPolicyTester(options PolicyTesterOptions) *tester { packageRootPath: options.PackageRootPath, generateTestResult: options.GenerateTestResult, testPath: options.TestPath, + globalTestConfig: options.GlobalTestConfig, } tester.resourcesManager = resources.NewManager() tester.resourcesManager.RegisterProvider(resources.DefaultKibanaProviderName, &resources.KibanaProvider{Client: tester.kibanaClient}) @@ -59,6 +63,12 @@ func (r *tester) String() string { return string(TestType) } +// Parallel indicates if this tester can run in parallel or not. +func (r tester) Parallel() bool { + // Not supported yet parallel tests even if it is indicated in the global config r.globalTestConfig + return false +} + func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { var results []testrunner.TestResult @@ -85,6 +95,14 @@ func (r *tester) runTest(ctx context.Context, manager *resources.Manager, testPa } testName := testNameFromPath(testPath) + + if skip := testrunner.AnySkipConfig(testConfig.Skip, r.globalTestConfig.Skip); skip != nil { + logger.Warnf("skipping %s test for %s/%s: %s (details: %s)", + TestType, r.testFolder.Package, r.testFolder.DataStream, + skip.Reason, skip.Link) + return result.WithSkip(skip) + } + policy := resources.FleetAgentPolicy{ Name: testName, Namespace: "ep", diff --git a/internal/testrunner/runners/static/runner.go b/internal/testrunner/runners/static/runner.go index a4ae6a0b8..54b228c01 100644 --- a/internal/testrunner/runners/static/runner.go +++ b/internal/testrunner/runners/static/runner.go @@ -25,12 +25,14 @@ type runner struct { packageRootPath string failOnMissingTests bool dataStreams []string + globalTestConfig testrunner.GlobalRunnerTestConfig } type StaticTestRunnerOptions struct { PackageRootPath string FailOnMissingTests bool DataStreams []string + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewStaticTestRunner(options StaticTestRunnerOptions) *runner { @@ -38,6 +40,7 @@ func NewStaticTestRunner(options StaticTestRunnerOptions) *runner { packageRootPath: options.PackageRootPath, failOnMissingTests: options.FailOnMissingTests, dataStreams: options.DataStreams, + globalTestConfig: options.GlobalTestConfig, } return &runner } @@ -89,8 +92,9 @@ func (r *runner) GetTests(ctx context.Context) ([]testrunner.Tester, error) { var testers []testrunner.Tester for _, t := range tests { testers = append(testers, NewStaticTester(StaticTesterOptions{ - PackageRootPath: r.packageRootPath, - TestFolder: t, + PackageRootPath: r.packageRootPath, + TestFolder: t, + GlobalTestConfig: r.globalTestConfig, })) } return testers, nil diff --git a/internal/testrunner/runners/static/tester.go b/internal/testrunner/runners/static/tester.go index d5cd1d48f..2c912e2ed 100644 --- a/internal/testrunner/runners/static/tester.go +++ b/internal/testrunner/runners/static/tester.go @@ -20,18 +20,21 @@ import ( ) type tester struct { - testFolder testrunner.TestFolder - packageRootPath string + testFolder testrunner.TestFolder + packageRootPath string + globalTestConfig testrunner.GlobalRunnerTestConfig } type StaticTesterOptions struct { - TestFolder testrunner.TestFolder - PackageRootPath string + TestFolder testrunner.TestFolder + PackageRootPath string + GlobalTestConfig testrunner.GlobalRunnerTestConfig } func NewStaticTester(options StaticTesterOptions) *tester { runner := tester{ - testFolder: options.TestFolder, - packageRootPath: options.PackageRootPath, + testFolder: options.TestFolder, + packageRootPath: options.PackageRootPath, + globalTestConfig: options.GlobalTestConfig, } return &runner } @@ -47,6 +50,12 @@ func (r tester) String() string { return "static files" } +// Parallel indicates if this tester can run in parallel or not. +func (r tester) Parallel() bool { + // Not supported yet parallel tests even if it is indicated in the global config r.globalTestConfig + return false +} + func (r tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { return r.run(ctx) } @@ -63,11 +72,16 @@ func (r tester) run(ctx context.Context) ([]testrunner.TestResult, error) { return result.WithError(fmt.Errorf("unable to load asset loading test config file: %w", err)) } - if testConfig != nil && testConfig.Skip != nil { - logger.Warnf("skipping %s test for %s: %s (details: %s)", - TestType, r.testFolder.Package, - testConfig.Skip.Reason, testConfig.Skip.Link) - return result.WithSkip(testConfig.Skip) + skipConfigs := []*testrunner.SkipConfig{r.globalTestConfig.Skip} + if testConfig != nil { + skipConfigs = append(skipConfigs, testConfig.Skip) + } + + if skip := testrunner.AnySkipConfig(skipConfigs...); skip != nil { + logger.Warnf("skipping %s test for %s/%s: %s (details: %s)", + TestType, r.testFolder.Package, r.testFolder.DataStream, + skip.Reason, skip.Link) + return result.WithSkip(skip) } pkgManifest, err := packages.ReadPackageManifestFromPackageRoot(r.packageRootPath) diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index f04ade1d2..553b95379 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -32,6 +32,7 @@ type runner struct { dataStreams []string serviceVariant string + globalTestConfig testrunner.GlobalRunnerTestConfig failOnMissingTests bool generateTestResult bool runIndependentElasticAgent bool @@ -63,6 +64,8 @@ type SystemTestRunnerOptions struct { RunTestsOnly bool ConfigFilePath string + GlobalTestConfig testrunner.GlobalRunnerTestConfig + FailOnMissingTests bool GenerateTestResult bool RunIndependentElasticAgent bool @@ -85,6 +88,7 @@ func NewSystemTestRunner(options SystemTestRunnerOptions) *runner { generateTestResult: options.GenerateTestResult, runIndependentElasticAgent: options.RunIndependentElasticAgent, deferCleanup: options.DeferCleanup, + globalTestConfig: options.GlobalTestConfig, } r.resourcesManager = resources.NewManager() @@ -247,6 +251,7 @@ func (r *runner) GetTests(ctx context.Context) ([]testrunner.Tester, error) { RunTearDown: r.runTearDown, ConfigFileName: config, RunIndependentElasticAgent: r.runIndependentElasticAgent, + GlobalTestConfig: r.globalTestConfig, }) if err != nil { return nil, fmt.Errorf( diff --git a/internal/testrunner/runners/system/tester.go b/internal/testrunner/runners/system/tester.go index 9a16436d1..c7f98999d 100644 --- a/internal/testrunner/runners/system/tester.go +++ b/internal/testrunner/runners/system/tester.go @@ -145,13 +145,17 @@ type tester struct { pipelines []ingest.Pipeline - dataStreamPath string - stackVersion kibana.VersionInfo - locationManager *locations.LocationManager - resourcesManager *resources.Manager + dataStreamPath string + stackVersion kibana.VersionInfo + locationManager *locations.LocationManager + resourcesManager *resources.Manager + pkgManifest *packages.PackageManifest + dataStreamManifest *packages.DataStreamManifest serviceStateFilePath string + globalTestConfig testrunner.GlobalRunnerTestConfig + // Execution order of following handlers is defined in runner.TearDown() method. removeAgentHandler func(context.Context) error deleteTestPolicyHandler func(context.Context) error @@ -172,9 +176,10 @@ type SystemTesterOptions struct { RunIndependentElasticAgent bool - DeferCleanup time.Duration - ServiceVariant string - ConfigFileName string + DeferCleanup time.Duration + ServiceVariant string + ConfigFileName string + GlobalTestConfig testrunner.GlobalRunnerTestConfig RunSetup bool RunTearDown bool @@ -196,6 +201,7 @@ func NewSystemTester(options SystemTesterOptions) (*tester, error) { runSetup: options.RunSetup, runTestsOnly: options.RunTestsOnly, runTearDown: options.RunTearDown, + globalTestConfig: options.GlobalTestConfig, } r.resourcesManager = resources.NewManager() r.resourcesManager.RegisterProvider(resources.DefaultKibanaProviderName, &resources.KibanaProvider{Client: r.kibanaClient}) @@ -203,22 +209,16 @@ func NewSystemTester(options SystemTesterOptions) (*tester, error) { r.serviceStateFilePath = filepath.Join(stateFolderPath(r.profile.ProfilePath), serviceStateFileName) var err error - var found bool r.locationManager, err = locations.NewLocationManager() if err != nil { return nil, fmt.Errorf("reading service logs directory failed: %w", err) } - r.dataStreamPath, found, err = packages.FindDataStreamRootForPath(r.testFolder.Path) + r.dataStreamPath, _, err = packages.FindDataStreamRootForPath(r.testFolder.Path) if err != nil { return nil, fmt.Errorf("locating data stream root failed: %w", err) } - if found { - logger.Debugf("Running system tests for data stream %q", r.testFolder.DataStream) - } else { - logger.Debug("Running system tests for package") - } if r.esAPI == nil { return nil, errors.New("missing Elasticsearch client") @@ -231,6 +231,30 @@ func NewSystemTester(options SystemTesterOptions) (*tester, error) { if err != nil { return nil, fmt.Errorf("cannot request Kibana version: %w", err) } + + r.pkgManifest, err = packages.ReadPackageManifestFromPackageRoot(r.packageRootPath) + if err != nil { + return nil, fmt.Errorf("reading package manifest failed: %w", err) + } + + r.dataStreamManifest, err = packages.ReadDataStreamManifest(filepath.Join(r.dataStreamPath, packages.DataStreamManifestFile)) + if err != nil { + return nil, fmt.Errorf("reading data stream manifest failed: %w", err) + } + + // Temporarily until independent Elastic Agents are enabled by default, + // enable independent Elastic Agents if package defines that requires root privileges + if pkg, ds := r.pkgManifest, r.dataStreamManifest; pkg.Agent.Privileges.Root || (ds != nil && ds.Agent.Privileges.Root) { + r.runIndependentElasticAgent = true + } + + // If the environment variable is present, it always has preference over the root + // privileges value (if any) defined in the manifest file + v, ok := os.LookupEnv(enableIndependentAgents) + if ok { + r.runIndependentElasticAgent = strings.ToLower(v) == "true" + } + return &r, nil } @@ -247,6 +271,12 @@ func (r *tester) String() string { return "system" } +// Parallel indicates if this tester can run in parallel or not. +func (r tester) Parallel() bool { + // it is required independent Elastic Agents to run in parallel system tests + return r.runIndependentElasticAgent && r.globalTestConfig.Parallel +} + // Run runs the system tests defined under the given folder func (r *tester) Run(ctx context.Context) ([]testrunner.TestResult, error) { if !r.runSetup && !r.runTearDown && !r.runTestsOnly { @@ -505,12 +535,19 @@ func (r *tester) run(ctx context.Context) (results []testrunner.TestResult, err startTesting := time.Now() - partial, err := r.runTestPerVariant(ctx, result, r.configFileName, r.serviceVariant) - results = append(results, partial...) + results, err = r.runTestPerVariant(ctx, result, r.configFileName, r.serviceVariant) if err != nil { return results, err } + // Every tester is in charge of just one test, so if there is no error, + // then there should be just one result for tests. As an exception, there could + // be two results if there is any issue checking Elastic Agent logs. + if len(results) > 0 && results[0].Skipped != nil { + logger.Debugf("Test skipped, avoid checking agent logs") + return results, nil + } + tempDir, err := os.MkdirTemp("", "test-system-") if err != nil { return nil, fmt.Errorf("can't create temporal directory: %w", err) @@ -728,8 +765,6 @@ func (r *tester) getDocs(ctx context.Context, dataStream string) (*hits, error) type scenarioTest struct { dataStream string policyTemplateName string - pkgManifest *packages.PackageManifest - dataStreamManifest *packages.DataStreamManifest kibanaDataStream kibana.PackageDataStream syntheticEnabled bool docs []common.MapStr @@ -773,40 +808,18 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf } } - scenario.pkgManifest, err = packages.ReadPackageManifestFromPackageRoot(r.packageRootPath) - if err != nil { - return nil, fmt.Errorf("reading package manifest failed: %w", err) - } - - scenario.dataStreamManifest, err = packages.ReadDataStreamManifest(filepath.Join(r.dataStreamPath, packages.DataStreamManifestFile)) - if err != nil { - return nil, fmt.Errorf("reading data stream manifest failed: %w", err) - } - - // Temporarily until independent Elastic Agents are enabled by default, - // enable independent Elastic Agents if package defines that requires root privileges - if pkg, ds := scenario.pkgManifest, scenario.dataStreamManifest; pkg.Agent.Privileges.Root || (ds != nil && ds.Agent.Privileges.Root) { - r.runIndependentElasticAgent = true - } - - // If the environment variable is present, it always has preference over the root - // privileges value (if any) defined in the manifest file - v, ok := os.LookupEnv(enableIndependentAgents) - if ok { - r.runIndependentElasticAgent = strings.ToLower(v) == "true" - } serviceOptions.DeployIndependentAgent = r.runIndependentElasticAgent policyTemplateName := config.PolicyTemplate if policyTemplateName == "" { - policyTemplateName, err = findPolicyTemplateForInput(*scenario.pkgManifest, *scenario.dataStreamManifest, config.Input) + policyTemplateName, err = findPolicyTemplateForInput(*r.pkgManifest, *r.dataStreamManifest, config.Input) if err != nil { return nil, fmt.Errorf("failed to determine the associated policy_template: %w", err) } } scenario.policyTemplateName = policyTemplateName - policyTemplate, err := selectPolicyTemplateByName(scenario.pkgManifest.PolicyTemplates, scenario.policyTemplateName) + policyTemplate, err := selectPolicyTemplateByName(r.pkgManifest.PolicyTemplates, scenario.policyTemplateName) if err != nil { return nil, fmt.Errorf("failed to find the selected policy_template: %w", err) } @@ -897,7 +910,7 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf policy = policyCurrent } - agentDeployed, agentInfo, err := r.setupAgent(ctx, config, serviceStateData, policy, scenario.pkgManifest.Agent) + agentDeployed, agentInfo, err := r.setupAgent(ctx, config, serviceStateData, policy, r.pkgManifest.Agent) if err != nil { return nil, err } @@ -922,7 +935,7 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf scenario.startTestTime = time.Now() logger.Debug("adding package data stream to test policy...") - ds := createPackageDatastream(*policyToTest, *scenario.pkgManifest, policyTemplate, *scenario.dataStreamManifest, *config, policyToTest.Namespace) + ds := createPackageDatastream(*policyToTest, *r.pkgManifest, policyTemplate, *r.dataStreamManifest, *config, policyToTest.Namespace) if r.runTearDown { logger.Debug("Skip adding data stream config to policy") } else { @@ -937,7 +950,7 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf // Input packages can set `data_stream.dataset` by convention to customize the dataset. dataStreamDataset := ds.Inputs[0].Streams[0].DataStream.Dataset - if scenario.pkgManifest.Type == "input" { + if r.pkgManifest.Type == "input" { v, _ := config.Vars.GetValue("data_stream.dataset") if dataset, ok := v.(string); ok && dataset != "" { dataStreamDataset = dataset @@ -966,6 +979,8 @@ func (r *tester) prepareScenario(ctx context.Context, config *testConfig, svcInf // FIXME: running per stages does not work when multiple agents are created var origPolicy kibana.Policy + // While there could be created Elastic Agents within `setupService()` (custom agents and k8s agents), + // this "checkEnrolledAgents" call to must be located after creating the service. agents, err := checkEnrolledAgents(ctx, r.kibanaClient, agentInfo, svcInfo, r.runIndependentElasticAgent) if err != nil { return nil, fmt.Errorf("can't check enrolled agents: %w", err) @@ -1185,6 +1200,9 @@ func (r *tester) setupService(ctx context.Context, config *testConfig, serviceOp if r.runTestsOnly { return nil } + if service == nil { + return nil + } logger.Debug("tearing down service...") if err := service.TearDown(ctx); err != nil { return fmt.Errorf("error tearing down service: %w", err) @@ -1192,6 +1210,7 @@ func (r *tester) setupService(ctx context.Context, config *testConfig, serviceOp return nil } + return service, service.Info(), nil } @@ -1284,13 +1303,13 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re if expectedDatasets == nil { var expectedDataset string if ds := r.testFolder.DataStream; ds != "" { - expectedDataset = getDataStreamDataset(*scenario.pkgManifest, *scenario.dataStreamManifest) + expectedDataset = getDataStreamDataset(*r.pkgManifest, *r.dataStreamManifest) } else { - expectedDataset = scenario.pkgManifest.Name + "." + scenario.policyTemplateName + expectedDataset = r.pkgManifest.Name + "." + scenario.policyTemplateName } expectedDatasets = []string{expectedDataset} } - if scenario.pkgManifest.Type == "input" { + if r.pkgManifest.Type == "input" { v, _ := config.Vars.GetValue("data_stream.dataset") if dataset, ok := v.(string); ok && dataset != "" { expectedDatasets = append(expectedDatasets, dataset) @@ -1298,7 +1317,7 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re } fieldsValidator, err := fields.CreateValidatorForDirectory(r.dataStreamPath, - fields.WithSpecVersion(scenario.pkgManifest.SpecVersion), + fields.WithSpecVersion(r.pkgManifest.SpecVersion), fields.WithNumericKeywordFields(config.NumericKeywordFields), fields.WithExpectedDatasets(expectedDatasets), fields.WithEnabledImportAllECSSChema(true), @@ -1324,9 +1343,9 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re } } - specVersion, err := semver.NewVersion(scenario.pkgManifest.SpecVersion) + specVersion, err := semver.NewVersion(r.pkgManifest.SpecVersion) if err != nil { - return result.WithError(fmt.Errorf("failed to parse format version %q: %w", scenario.pkgManifest.SpecVersion, err)) + return result.WithError(fmt.Errorf("failed to parse format version %q: %w", r.pkgManifest.SpecVersion, err)) } // Write sample events file from first doc, if requested @@ -1340,7 +1359,7 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re } // Check transforms if present - if err := r.checkTransforms(ctx, config, scenario.pkgManifest, scenario.kibanaDataStream, scenario.dataStream); err != nil { + if err := r.checkTransforms(ctx, config, r.pkgManifest, scenario.kibanaDataStream, scenario.dataStream); err != nil { return result.WithError(err) } @@ -1360,11 +1379,11 @@ func (r *tester) validateTestScenario(ctx context.Context, result *testrunner.Re func (r *tester) runTest(ctx context.Context, config *testConfig, svcInfo servicedeployer.ServiceInfo) ([]testrunner.TestResult, error) { result := r.newResult(config.Name()) - if config.Skip != nil { + if skip := testrunner.AnySkipConfig(config.Skip, r.globalTestConfig.Skip); skip != nil { logger.Warnf("skipping %s test for %s/%s: %s (details: %s)", TestType, r.testFolder.Package, r.testFolder.DataStream, - config.Skip.Reason, config.Skip.Link) - return result.WithSkip(config.Skip) + skip.Reason, skip.Link) + return result.WithSkip(skip) } logger.Debugf("running test with configuration '%s'", config.Name()) diff --git a/internal/testrunner/testrunner.go b/internal/testrunner/testrunner.go index 3e56e840c..13acf971e 100644 --- a/internal/testrunner/testrunner.go +++ b/internal/testrunner/testrunner.go @@ -10,17 +10,26 @@ import ( "fmt" "os" "path/filepath" + "runtime" "sort" + "strconv" "strings" + "sync" "time" "github.com/elastic/elastic-package/internal/elasticsearch" + "github.com/elastic/elastic-package/internal/environment" "github.com/elastic/elastic-package/internal/kibana" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/profile" ) +var ( + defaultMaximumRoutines = runtime.GOMAXPROCS(0) / 2 + maximumNumberParallelTest = environment.WithElasticPackagePrefix("MAXIMUM_NUMBER_PARALLEL_TESTS") +) + // TestType represents the various supported test types type TestType string @@ -60,6 +69,9 @@ type Tester interface { // TearDown cleans up any test runner resources. It must be called // after the test runner has finished executing. TearDown(context.Context) error + + // Parallel indicates if this test can be run in parallel or not + Parallel() bool } type TesterFactory func(TestFolder) (Tester, error) @@ -303,32 +315,135 @@ func RunSuite(ctx context.Context, runner TestRunner) ([]TestResult, error) { cleanupCtx := context.WithoutCancel(ctx) tdErr := runner.TearDownRunner(cleanupCtx) if tdErr != nil { - logger.Debugf("failed to tear down %s runner: %w", runner.Type(), tdErr) + logger.Debugf("failed to tear down %s runner: %s", runner.Type(), tdErr) } return nil, fmt.Errorf("failed to setup %s runner: %w", runner.Type(), err) } - results, err := runWithFactory(ctx, testers) + + var parallelTesters, sequentialTesters []Tester + for _, tester := range testers { + if tester.Parallel() { + parallelTesters = append(parallelTesters, tester) + } else { + sequentialTesters = append(sequentialTesters, tester) + } + } + + var allResults, results []TestResult + var parallelErr, sequentialErr error + + results, parallelErr = runSuiteParallel(ctx, parallelTesters) + allResults = append(allResults, results...) + + results, sequentialErr = runSuite(ctx, sequentialTesters) + allResults = append(allResults, results...) // Avoid cancellations during cleanup. cleanupCtx := context.WithoutCancel(ctx) tdErr := runner.TearDownRunner(cleanupCtx) if tdErr != nil { - return results, fmt.Errorf("failed to tear down %s runner: %w", runner.Type(), err) + return allResults, fmt.Errorf("failed to tear down %s runner: %w", runner.Type(), tdErr) + } + + if parallelErr != nil { + return allResults, parallelErr + } + if sequentialErr != nil { + return allResults, sequentialErr } - return results, err + return allResults, nil } -// runWithFactory method delegates execution of tests to the runners generated through the factory function. -func runWithFactory(ctx context.Context, testers []Tester) ([]TestResult, error) { +func maxNumberRoutines() (int, error) { + var err error + maxRoutines := defaultMaximumRoutines + v, ok := os.LookupEnv(maximumNumberParallelTest) + if ok { + maxRoutines, err = strconv.Atoi(v) + if err != nil { + return 0, fmt.Errorf("failed to read number of maximum routines from environment variable: %w", err) + } + } + return maxRoutines, nil +} + +func runSuite(ctx context.Context, testers []Tester) ([]TestResult, error) { + if len(testers) == 0 { + return nil, nil + } + logger.Debugf("Running tests sequentially") var results []TestResult for _, tester := range testers { r, err := run(ctx, tester) if err != nil { - return nil, fmt.Errorf("error running package %s tests: %w", tester.Type(), err) + return results, fmt.Errorf("error running package %s tests: %w", tester.Type(), err) } results = append(results, r...) } + + return results, nil +} + +// runSuiteParallel method delegates execution of tests to the runners generated through the factory function. +func runSuiteParallel(ctx context.Context, testers []Tester) ([]TestResult, error) { + if len(testers) == 0 { + return nil, nil + } + maxRoutines, err := maxNumberRoutines() + if err != nil { + return nil, err + } + + var wg sync.WaitGroup + type routineResult struct { + results []TestResult + err error + } + chResults := make(chan routineResult, len(testers)) + + logger.Debugf("Running tests in parallel. Maximum routines to run in parallel: %d", maxRoutines) + // Use channel as a semaphore to limit the number of test executions in parallel + sem := make(chan int, maxRoutines) + + for _, tester := range testers { + wg.Add(1) + tester := tester + sem <- 1 + go func() { + defer wg.Done() + defer func() { + <-sem + }() + if err := ctx.Err(); err != nil { + logger.Errorf("context error: %s", context.Cause(ctx)) + chResults <- routineResult{nil, err} + return + } + r, err := run(ctx, tester) + chResults <- routineResult{r, err} + }() + } + + wg.Wait() + close(chResults) + close(sem) + + var results []TestResult + var multiErr error + testType := testers[0].Type() + for testResults := range chResults { + if testResults.err != nil { + multiErr = errors.Join(multiErr, testResults.err) + } + + results = append(results, testResults.results...) + } + + if multiErr != nil { + return results, fmt.Errorf("error running package %s tests: %w", testType, multiErr) + } + return results, nil } @@ -376,3 +491,12 @@ func PackageHasDataStreams(manifest *packages.PackageManifest) (bool, error) { return false, fmt.Errorf("unexpected package type %q", manifest.Type) } } + +func AnySkipConfig(configs ...*SkipConfig) *SkipConfig { + for _, config := range configs { + if config != nil { + return config + } + } + return nil +} diff --git a/test/packages/parallel/apache/_dev/test/config.yml b/test/packages/parallel/apache/_dev/test/config.yml new file mode 100644 index 000000000..a57750f85 --- /dev/null +++ b/test/packages/parallel/apache/_dev/test/config.yml @@ -0,0 +1,2 @@ +system: + parallel: true diff --git a/test/packages/parallel/nginx/_dev/test/config.yml b/test/packages/parallel/nginx/_dev/test/config.yml new file mode 100644 index 000000000..a57750f85 --- /dev/null +++ b/test/packages/parallel/nginx/_dev/test/config.yml @@ -0,0 +1,2 @@ +system: + parallel: true diff --git a/test/packages/parallel/sql_input/_dev/test/config.yml b/test/packages/parallel/sql_input/_dev/test/config.yml new file mode 100644 index 000000000..a57750f85 --- /dev/null +++ b/test/packages/parallel/sql_input/_dev/test/config.yml @@ -0,0 +1,2 @@ +system: + parallel: true