Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multiple outputs for each input #725

Merged

Conversation

constanca-m
Copy link
Contributor

What does this PR do?

Each input can now have multiple outputs of the same type. It cannot, however, have the same output specified more than once for each input - that is, it cannot have two elasticsearch outputs with the same destination. See section Results for examples.

Why is it important?

See details on #721.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.md

How to test this PR locally

Refer to https://github.com/elastic/elastic-serverless-forwarder/tree/main/how-to-test-locally.

Related issues

Relates to #721

Results

Example 1: Trying 2 elasticsearch outputs

I have an input with two elasticsearch outputs:

"inputs":
- "id": "arn:aws:logs:eu-west-2:627286350134:log-group:constanca-test-local-esf:*"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "https://terraform-8b3bac.es.eu-central-1.aws.cloud.es.io"
      "es_datastream_name": "logs-esf.cloudwatch-default"
    "type": "elasticsearch"
  - "args":
      "api_key": "..."
      "elasticsearch_url": "https://terraform-2.es.europe-west4.gcp.elastic-cloud.com"
      "es_datastream_name": "logs-esf.cloudwatch-default"
    "type": "elasticsearch"
  "type": "cloudwatch-logs"

I sent a log event from this input.

If I look at Discover in both clouds, I can see that both outputs got my message:

Example 2: Trying 2 elasticsearch outputs with the same destination

I have an input with two elasticsearch outputs that are the same:

"inputs":
- "id": "arn:aws:logs:eu-west-2:627286350134:log-group:constanca-test-local-esf:*"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "https://terraform-8b3bac.es.eu-central-1.aws.cloud.es.io"
      "es_datastream_name": "logs-esf.cloudwatch-default"
    "type": "elasticsearch"
  - "args":
      "api_key": "..."
      "elasticsearch_url": "https://terraform-8b3bac.es.eu-central-1.aws.cloud.es.io"
      "es_datastream_name": "logs-esf.cloudwatch-default"
    "type": "elasticsearch"
  "type": "cloudwatch-logs"

This case fails, since we cannot have duplicated outputs (that is, output with the same destination for the same input):

[ERROR] ConfigFileException: An error occurred while applying output configuration at position 2 for input arn:aws:logs:eu-west-2:627286350134:log-group:constanca-test-local-esf:*: Duplicated output destination https://terraform-8b3bac.es.eu-central-1.aws.cloud.es.io for type elasticsearch
Traceback (most recent call last):
  File "/var/task/handlers/aws/utils.py", line 63, in wrapper
    return func(lambda_event, lambda_context)
  File "/var/task/handlers/aws/utils.py", line 98, in wrapper
    raise e
  File "/var/task/handlers/aws/utils.py", line 82, in wrapper
    return func(lambda_event, lambda_context)
  File "/var/task/handlers/aws/handler.py", line 74, in lambda_handler
    raise ConfigFileException(e)

Example 3: Checking message body in replay queue

The message body caused by an ingestion error should contain the output_destination (instead of output_type like before). I am causing the placement of the message in the replay queue by using wrong authentication:

"inputs":
- "id": "arn:aws:logs:eu-west-2:627286350134:log-group:constanca-test-local-esf:*"
  "outputs":
  - "args":
      "api_key": "<Wrong API Key"
      "elasticsearch_url": "https://terraform-8b3bac.es.eu-central-1.aws.cloud.es.io"
      "es_datastream_name": "logs-esf.cloudwatch-default"
    "type": "elasticsearch"

Message in the replay queue:

{"output_destination":"https://terraform-8b3bac.es.eu-central-1.aws.cloud.es.io","output_args":{"es_datastream_name":"logs-esf.cloudwatch-default"},"event_payload":...

Signed-off-by: constanca <[email protected]>
Signed-off-by: constanca <[email protected]>
@constanca-m constanca-m self-assigned this May 29, 2024
Signed-off-by: constanca <[email protected]>
Signed-off-by: constanca <[email protected]>
Signed-off-by: constanca <[email protected]>
Signed-off-by: constanca <[email protected]>
@constanca-m
Copy link
Contributor Author

I want to see if there is any significant change in performance between our latest released version, lambda-v1.14.0, and the ESF created from this PR.

To check this, I am going to trigger ESF based on messages coming to a cloudwatch logs group. This group is receiving some X number of events, then stops for 30s, and then the cycle repeats. The number of events doubles every 20 cycles. I let it run 100 cycles. Summarizing:

Number of events sent + sleep 30s Cycles
50 20
100 20
200 20
400 20
800 20

The message being sent is small sized, only 94b.

I also want to check how long it takes to get a message from the cloudwatch groups to elasticsearch. Since ESF does not add to a document to ingest timestamp, I am going to add a default ingest pipeline and add that field in every document.

You can add the ingest pipeline like this.
# Create pipeline to add ingest timestamp
PUT _ingest/pipeline/add_date
{
  "description" : "This pipeline add the ingest time to the document",
  "processors" : [
    {
      "set" : {
        "field": "received",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

# Creates a component template for index settings
PUT _component_template/logs-esf-settings
{
  "template": {
    "settings": {
      "index.default_pipeline": "add_date",
      "index.lifecycle.name": "logs"
    }
  }
}

# Creates an index template
PUT _index_template/logs-esf-template
{
  "index_patterns": ["logs-esf.cloudwatch-*"],
  "data_stream": { },
  "priority": 500,
  "composed_of": ["logs-esf-settings"]
}

# Rollover the data stream to use an index with the pipeline
POST logs-esf.cloudwatch-default/_rollover

I deployed ESF and updated the lambda 3 times:

  1. First run: ESF is using lambda-v1.14.0 and it has 1 elasticsearch output
  2. Second run: ESF is using the code from this PR and it has 1 elasticsearch output
  3. Third run: ESF is using the code from this PR and it has 2 elasticsearch outputs

So the first line in my graphics corresponds to first run, second line to second run, and third line to third run. The cloudwatch metrics for the ESF lambda function are these:

image

There is very little difference between the 3 ESFs and their configurations. For the 2 outputs configurations, the average duration slightly increased. This is to be expected, as now it needs to iterate over two outputs.

I also computed the average latency using the field my ingest pipeline added. I am calculating the latency this way:

  1. I create a visualization graph with the formula last_value(received)-last_value(@timestamp).
  2. I use a minimum interval of 30s. I am only using one document (the last one) for each interval.

The latency for 1 output (the one that is common for all three runs) is like this:
image

The latency for this output barely changed between my 3 runs.

The second output (third run) has a bigger latency:
image

This is to be expected as well. We can see that the minimum average latency is already past the latency of the first output. From this we can conclude that ESF sent the data to the first output first, and then it sent the data to this second output. They did not happen in parallel.

My conclusions is that there is no significant change or decay in performance. However, the more outputs the user adds, the bigger the latency gets for each output.


# Let's wrap the specific output shipper in the composite one, since the composite deepcopy the mutating events
shipper: CompositeShipper = CompositeShipper()

if output_type == "elasticsearch":
if output.type == "elasticsearch":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to either keep output.type or output_destination common in the whole doc

So output.type to become output.destination

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output.destination is the URL - either the cloud url or ES url. output.type can only be elasticsearch or logstash, they are different things @gizas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I had the impression that output.destination does not exist. Thanks for clarification
So destination should become output_url in general to denote the actual info of the variabble. But nw not important


if output_type in self._outputs:
raise ValueError(f"Duplicated `type` {output_type}")
if output_type not in _available_output_types:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here to chaneg type with destination

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before, output.type can only be elasticsearch or logstash, we use it to create the shipper, not the same as the destination

elif output_type == "logstash":
if "logstash_url" not in kwargs:
raise ValueError(f"Output type {output_type} requires logstash_url to be set")
output_dest = kwargs["logstash_url"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gizas You can see in this line how the output destination is initiated

@constanca-m constanca-m merged commit d7bf82f into elastic:main Jun 18, 2024
5 checks passed
@constanca-m constanca-m deleted the enable-multiple-and-shared-outputs branch June 18, 2024 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants