Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Place messages from misconfigured IDs in the replay queue #711

Merged
merged 8 commits into from
May 14, 2024

Conversation

constanca-m
Copy link
Contributor

@constanca-m constanca-m commented May 7, 2024

What does this PR do?

Prints error message if the input id from the trigger is not present in config.yaml, and places the message in the replay queue.

Why is this PR important?

Issues linked explained it well, but to sum it up:

  • If the ID in the config.yaml is set incorrectly, but ESF is associated with the right trigger inputs ids (read next section to get better understanding on how this can happen), we should print an error that the ID is not present in the config.yaml so there is no output available for it. This way, all these messages will go to the replay queue, and we will count these events as error_events.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.md

How to test this PR locally

I added a how-to-test directory in the commits. Please follow the README.md file from that directory.
Important: You need to use it with the code from this PR that is currently not merged, otherwise it will fail.

I also added some tests to check the undefined input that will run in one of the GitHub workflows.

Related issues

Tests and Results

SQS trigger

config.yaml looks like this:

"inputs":
- "id": "arn:aws:sqs:eu-west-2:627286350134:wrong-sqs"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "..."
      "es_datastream_name": "logs-esf.sqs-default"
    "type": "elasticsearch"
  "type": "sqs"

And ESF has aws_lambda_event_source_mapping configured correctly with the SQS ARN.

ESF logs. To make it easier, I annotated the logs with <- in the relevant lines.
INIT_START Runtime Version: python:3.9.v51	Runtime Version ARN: arn:aws:lambda:eu-west-2::runtime:415294d499803f2ba2e75d2753b4523c4517c1c13e55d8ca19a44434f6cbb9a5
{
    "@timestamp": "2024-05-07T10:27:19.154Z",
    "log.level": "info",
    "message": "Found credentials in environment variables.",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "botocore.credentials",
        "origin": {
            "file": {
                "line": 1147,
                "name": "credentials.py"
            },
            "function": "load"
        },
        "original": "Found credentials in environment variables."
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139802417399616,
            "name": "MainThread"
        }
    }
}

START RequestId: 85df0f38-3743-527e-8126-518c3377948d Version: $LATEST
{
    "@timestamp": "2024-05-07T10:27:19.320Z",
    "log.level": "info",
    "message": "trigger",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 56,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "trigger"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139802417399616,
            "name": "MainThread"
        }
    },
    "type": "sqs"
}

{
    "@timestamp": "2024-05-07T10:27:19.320Z",
    "log.level": "info",
    "message": "config file",
    "bucket_name": "constanca-test-esf-config-bucket",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 204,
                "name": "utils.py"
            },
            "function": "config_yaml_from_s3"
        },
        "original": "config file"
    },
    "object_key": "config.yaml",
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139802417399616,
            "name": "MainThread"
        }
    }
}

{
    "@timestamp": "2024-05-07T10:27:19.818Z",
    "log.level": "info",
    "message": "trigger",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 338,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "trigger"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139802417399616,
            "name": "MainThread"
        }
    },
    "size": 1
}

{
    "@timestamp": "2024-05-07T10:27:19.819Z",
    "log.level": "error", 1, <------------------ FIXED
    "message": "no input defined",
    "ecs": {
        "version": "1.6.0"
    },
    "input_id": "arn:aws:sqs:eu-west-2:627286350134:constanca-esf-issue-696",
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 428,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "no input defined"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139802417399616,
            "name": "MainThread"
        }
    }
}

{
    "@timestamp": "2024-05-07T10:27:20.035Z",
    "log.level": "info",
    "message": "lambda processed all the events",
    "ecs": {
        "version": "1.6.0"
    },
    "empty_events": 0,
    "error_events": 1, <------------------ NEW
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 554,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "lambda processed all the events"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139802417399616,
            "name": "MainThread"
        }
    },
    "sent_events": 0,
    "skipped_events": 0
}

END RequestId: 85df0f38-3743-527e-8126-518c3377948d
REPORT RequestId: 85df0f38-3743-527e-8126-518c3377948d	Duration: 738.26 ms	Billed Duration: 739 ms	Memory Size: 128 MB	Max Memory Used: 89 MB	Init Duration: 707.29 ms

The replaying queue, constanca-test-esf-replay-queue, has Messages available: 1.

Cloudwatch logs trigger

config.yaml looks like this:

"inputs":
- "id": "arn:aws:logs:eu-west-2:627286350134:log-group:wrong-log-group:*"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "..."
      "es_datastream_name": "logs-esf.cloudwatchlogs-default"
    "type": "elasticsearch"
  "type": "cloudwatch-logs"

And ESF has aws_cloudwatch_log_subscription_filter configured correctly with the Cloudwatch Logs ARN.

ESF logs. To make it easier, I annotated the logs with <- in the relevant lines.
INIT_START Runtime Version: python:3.9.v51	Runtime Version ARN: arn:aws:lambda:eu-west-2::runtime:415294d499803f2ba2e75d2753b4523c4517c1c13e55d8ca19a44434f6cbb9a5
{
    "@timestamp": "2024-05-07T10:46:12.197Z",
    "log.level": "info",
    "message": "Found credentials in environment variables.",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "botocore.credentials",
        "origin": {
            "file": {
                "line": 1147,
                "name": "credentials.py"
            },
            "function": "load"
        },
        "original": "Found credentials in environment variables."
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 140459008206656,
            "name": "MainThread"
        }
    }
}

START RequestId: e5adae49-87f2-4b3d-a504-a67cc7aaa421 Version: $LATEST
{
    "@timestamp": "2024-05-07T10:46:12.360Z",
    "log.level": "info",
    "message": "trigger",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 56,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "trigger"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 140459008206656,
            "name": "MainThread"
        }
    },
    "type": "cloudwatch-logs"
}

{
    "@timestamp": "2024-05-07T10:46:12.360Z",
    "log.level": "info",
    "message": "config file",
    "bucket_name": "constanca-test-esf-config-bucket",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 204,
                "name": "utils.py"
            },
            "function": "config_yaml_from_s3"
        },
        "original": "config file"
    },
    "object_key": "config.yaml",
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 140459008206656,
            "name": "MainThread"
        }
    }
}

{
    "@timestamp": "2024-05-07T10:46:12.830Z",
    "log.level": "info",
    "message": "trigger",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 141,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "trigger"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 140459008206656,
            "name": "MainThread"
        }
    },
    "size": 1
}

{
    "@timestamp": "2024-05-07T10:46:14.483Z",
    "log.level": "error",  <--------------------- FIXED
    "message": "no input defined",
    "ecs": {
        "version": "1.6.0"
    },
    "input_id": "arn:aws:logs:%AWS_REGION%:627286350134:log-group:constanca-test-esf-log-group:*",
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 151,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "no input defined"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 140459008206656,
            "name": "MainThread"
        }
    }
}

{
    "@timestamp": "2024-05-07T10:46:14.698Z",
    "log.level": "info",
    "message": "lambda is going to shutdown",
    "ecs": {
        "version": "1.6.0"
    },
    "empty_events": 0,
    "error_events": 1, <---------------------- NEW
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 161,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "lambda is going to shutdown"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 140459008206656,
            "name": "MainThread"
        }
    },
    "sent_events": 0,
    "skipped_events": 0
}

END RequestId: e5adae49-87f2-4b3d-a504-a67cc7aaa421
REPORT RequestId: e5adae49-87f2-4b3d-a504-a67cc7aaa421	Duration: 2370.59 ms	Billed Duration: 2371 ms	Memory Size: 128 MB	Max Memory Used: 107 MB	Init Duration: 697.21 ms

Note: The input id in the error message is"input_id": "arn:aws:logs:%AWS_REGION%:627286350134:log-group:constanca-test-esf-log-group:*". We don't have access to the region name from the message, so I set it as %AWS_REGION%.

Kinesis data stream trigger

config.yaml:

"inputs":
- "id": "arn:aws:kinesis:eu-west-2:627286350134:stream/wrong-kinesis"
  "outputs":
  - "args":
      "api_key": "..."
      "elasticsearch_url": "..."
      "es_datastream_name": "logs-esf.kinesis-default"
    "type": "elasticsearch"
  "type": "kinesis-data-stream"

And configured aws_lambda_event_source_mapping with the correct Kinesis data stream ARN.

ESF logs. To make it easier, I annotated the logs with <- in the relevant lines.
INIT_START Runtime Version: python:3.9.v51	Runtime Version ARN: arn:aws:lambda:eu-west-2::runtime:415294d499803f2ba2e75d2753b4523c4517c1c13e55d8ca19a44434f6cbb9a5
{
    "@timestamp": "2024-05-07T12:01:56.950Z",
    "log.level": "info",
    "message": "Found credentials in environment variables.",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "botocore.credentials",
        "origin": {
            "file": {
                "line": 1147,
                "name": "credentials.py"
            },
            "function": "load"
        },
        "original": "Found credentials in environment variables."
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139842871994176,
            "name": "MainThread"
        }
    }
}

START RequestId: a2de79e1-fe33-4b17-ac29-ae708b4bf9e0 Version: $LATEST
{
    "@timestamp": "2024-05-07T12:01:57.117Z",
    "log.level": "info",
    "message": "trigger",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 56,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "trigger"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139842871994176,
            "name": "MainThread"
        }
    },
    "type": "kinesis-data-stream"
}

{
    "@timestamp": "2024-05-07T12:01:57.118Z",
    "log.level": "info",
    "message": "config file",
    "bucket_name": "constanca-test-esf-config-bucket",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 204,
                "name": "utils.py"
            },
            "function": "config_yaml_from_s3"
        },
        "original": "config file"
    },
    "object_key": "config.yaml",
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139842871994176,
            "name": "MainThread"
        }
    }
}

{
    "@timestamp": "2024-05-07T12:01:57.591Z",
    "log.level": "info",
    "message": "trigger",
    "ecs": {
        "version": "1.6.0"
    },
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 237,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "trigger"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139842871994176,
            "name": "MainThread"
        }
    },
    "size": 1
}

{
    "@timestamp": "2024-05-07T12:01:57.592Z",
    "log.level": "error", <----------------------- FIXED
    "message": "no input defined",
    "ecs": {
        "version": "1.6.0"
    },
    "input_id": "arn:aws:kinesis:eu-west-2:627286350134:stream/constanca-test-esf-kinesis",
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 243,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "no input defined"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139842871994176,
            "name": "MainThread"
        }
    }
}

{
    "@timestamp": "2024-05-07T12:01:57.825Z",
    "log.level": "info",
    "message": "lambda is going to shutdown",
    "ecs": {
        "version": "1.6.0"
    },
    "empty_events": 0,
    "error_events": 1, <----------------------- NEW
    "log": {
        "logger": "root",
        "origin": {
            "file": {
                "line": 256,
                "name": "handler.py"
            },
            "function": "lambda_handler"
        },
        "original": "lambda is going to shutdown"
    },
    "process": {
        "name": "MainProcess",
        "pid": 8,
        "thread": {
            "id": 139842871994176,
            "name": "MainThread"
        }
    },
    "sent_events": 0,
    "skipped_events": 0
}

END RequestId: a2de79e1-fe33-4b17-ac29-ae708b4bf9e0
REPORT RequestId: a2de79e1-fe33-4b17-ac29-ae708b4bf9e0	Duration: 734.63 ms	Billed Duration: 735 ms	Memory Size: 128 MB	Max Memory Used: 89 MB	Init Duration: 716.69 ms

Replay Queue

After all the three failed triggers, the replay queue has Messages available: 3.

Signed-off-by: constanca <[email protected]>
@constanca-m constanca-m self-assigned this May 7, 2024
Signed-off-by: constanca <[email protected]>
Signed-off-by: constanca <[email protected]>
Signed-off-by: constanca <[email protected]>
@@ -462,20 +464,6 @@ def test_lambda_handler_noop(self) -> None:
del lambda_event["Records"][0]["messageAttributes"]["originalEventSourceARN"]
assert handler(lambda_event, ctx) == "completed" # type:ignore

with self.subTest("no input defined for kinesis-data-stream"):
Copy link
Contributor Author

@constanca-m constanca-m May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this test because it does not make sense. Reference.

We have now tests in test_integrations to check if inputs are defined.

Signed-off-by: constanca <[email protected]>
@@ -399,6 +399,8 @@ def test_lambda_handler_noop(self) -> None:
with self.subTest("no originalEventSourceARN in messageAttributes"):
ctx = ContextMock()
os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key"
os.environ["SQS_REPLAY_URL"] = "https://sqs.us-east-2.amazonaws.com/123456789012/replay_queue"
os.environ["SQS_CONTINUE_URL"] = "https://sqs.us-east-2.amazonaws.com/123456789012/continue_queue"
lambda_event = deepcopy(_dummy_lambda_event)
del lambda_event["Records"][0]["messageAttributes"]["originalEventSourceARN"]
assert handler(lambda_event, ctx) == "completed" # type:ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struggling to understand why this assertion is completed?

The originalEventSourceARN == arn:aws:sqs:eu-central-1:123456789:sqs-queue" but here we have a different region. Should we raise another status?

Copy link
Contributor Author

@constanca-m constanca-m May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using the originalEventSourceARN attribute here, but the arn:aws:sqs:eu-central-1:123456789:s3-sqs-queue instead (so the eventSourceARN attribute). And the queues are not being used... But that is a great point. I suspect it would fail with different regions, but it is never being tested in this file (there are no timeout tests or misconfigured ids here, so no reason to use those queues). I had to add the line because of this part:

sqs_replaying_queue = os.environ["SQS_REPLAY_URL"]
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

I will fix this as it does not make sense indeed

- for:
var: REQUIREMENTS
split: ','
cmd: pip3.9 install -r ../{{ .ITEM }} -t $DIR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider taking this from mypy.ini and python_version = 3.9 variable

In my case this was failing in my local tests (because did not have versin 3.9)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While building it locally I see in lamda

[ERROR] Runtime.ImportModuleError: Unable to import module 'main_aws': No module named 'orjson.orjson'
Traceback (most recent call last):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used python 3.9 here because we use this for the tests and for our ESF terraform files:

python-version: '3.9' # As defined in tests/scripts/docker/run_tests.sh

There is an open issue to offer support to version 3.12 as well: #656.

@constanca-m constanca-m merged commit 45e6c35 into elastic:main May 14, 2024
5 checks passed
@constanca-m constanca-m deleted the error-undefined-input branch May 14, 2024 12:12
@constanca-m constanca-m restored the error-undefined-input branch May 14, 2024 15:56
constanca-m added a commit that referenced this pull request May 14, 2024
@constanca-m constanca-m deleted the error-undefined-input branch May 14, 2024 15:56
@constanca-m constanca-m restored the error-undefined-input branch May 14, 2024 15:57
@constanca-m constanca-m deleted the error-undefined-input branch May 14, 2024 15:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Misconfigured ESF input results in a warning message, not events sent to replay queue
3 participants