Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable shared outputs and multiple outputs per input #721

Closed
constanca-m opened this issue May 16, 2024 · 0 comments
Closed

Enable shared outputs and multiple outputs per input #721

constanca-m opened this issue May 16, 2024 · 0 comments
Assignees

Comments

@constanca-m
Copy link
Contributor

constanca-m commented May 16, 2024

Enhancement

Currently, each input needs to have defined one output (see an example of the configuration file here).

However, this is not convenient. If a user wants to use the same output for 4 inputs, they have to define it 4 times. And if they want to change it later, they have to do it another 4 times, when we could place them as a shared output.

There are also two issues (1 and 2) requesting the possibility to have more than one output defined for input. At this moment, this is not possible, unless the output type is different. Current options are:

_available_output_types: list[str] = ["elasticsearch", "logstash"]

The outputs for each input are saved in a dictionary:

self._outputs: dict[str, Output] = {}

However, this dictionary is using the type as the key, so if we try to add a new output with the same type, it will fail:

def add_output(self, output_type: str, **kwargs: Any) -> None:
"""
Output setter.
Set an output given its type and init kwargs
"""
if not isinstance(output_type, str):
raise ValueError("`type` must be provided as string")
if output_type in self._outputs:
raise ValueError(f"Duplicated `type` {output_type}")

Additionally, and since in the code,each input is uniquely identified by its id (see this function), I also do not think it is possible to configure the same input to have multiple outputs at all, even if specified in double.

To further clarify this change, let's consider this example: I want to:

  1. Have two inputs, each for corresponding to one cloudwatch logs group. Both should send data to the same ES.
  2. Have two outputs for one of the two cloudwatch logs group.

For this, the best we can do is:

inputs:
  - id: "<ID-1>"
    type: cloudwatch-logs
    outputs:
      - args:
          api_key: "<API-KEY-1>"
          elasticsearch_url": "<URL-1>"
          es_datastream_name: "logs-esf.cloudwatch-default"
        type: elasticsearch
  - id: "<ID-2>"
    type: cloudwatch-logs
    outputs:
      - args:
          api_key: "<API-KEY-1>"
          elasticsearch_url": "<URL-1>"
          es_datastream_name: "logs-esf.cloudwatch-default"
        type: elasticsearch

So I had to define the same output twice, and I fail at achieving my goal 2., since it is not possible.

With this enhancement I could refactor the config.yaml file to look like this:

# Shared outputs
outputs:
  - args:
      api_key: "<API-KEY-1>"
      elasticsearch_url": "<URL-1>"
      es_datastream_name: "logs-esf.cloudwatch-default"
    type: elasticsearch
    id: my-shared-output-1

inputs:
  - id: "<ID-1>"
    type: cloudwatch-logs
    outputs:
      - args:
          api_key: "<API-KEY-2>"
          elasticsearch_url": "<URL-2>"
          es_datastream_name: "logs-esf.cloudwatch-default"
        type: elasticsearch
      - id: my-shared-output-1
  - id: "<ID-2>"
    type: cloudwatch-logs
    outputs:
      - id: my-shared-output-1

Explanation:

  1. I defined a list under outputs that will be the shared output. The shared outputs should have a field id to simplify their reference in case the user wants to have multiple shared outputs, but not apply them all to inputs.
  2. I can add more outputs under inputs[*].outputs list.

To completion

The code needs to be changed.

The configuration file is parsed here:

config = parse_config(config_yaml, _expanders)

Inside this function we iterate over the outputs:

for output_n, output_config in enumerate(input_config["outputs"]):

We need to change this function to allow more than one output.

These outputs are then reference here:

composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)

Note: event_input contains our outputs.

This composite_shipper gets our outputs, and later uses it when calling (this is just one reference of usage):

sent_outcome = composite_shipper.send(es_event)

After this change, the documentation here needs, of course, to be updated.

PRs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant