Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3-sqs input: log a lambda event summary in case of errors #860

Merged
merged 9 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,96 @@ def wrapper(lambda_event: dict[str, Any], lambda_context: context_.Context) -> s
if apm_client:
apm_client.capture_exception()

shared_logger.exception("exception raised", exc_info=e)
shared_logger.exception(
"exception raised",
exc_info=e,
extra={
"summary": summarize_lambda_event(lambda_event, max_records=20),
},
)

return f"exception raised: {e.__repr__()}"

return wrapper


def summarize_lambda_event(event: dict[str, Any], max_records: int = 10) -> dict[str, Any]:
"""
Summarize the lambda event to include only the most relevant information.
"""
summary: dict[str, Any] = {}

try:
first_records_key = f"first_{max_records}_records"
records = event.get("Records", [])

for record in records:
event_source = record.get("eventSource", "unknown")

if event_source == "aws:sqs":
aws_sqs_summary = summary.get(
"aws:sqs",
# if `aws:sqs` key does not exist yet,
# we initialize the summary.
{
"total_records": 0,
first_records_key: [],
},
)

# We keep track of the total number of notifications in the
# lambda event, so users know if the summary is incomplete.
notifications = json_parser(record["body"])

# So users know if we included only a
# subset of the records.
aws_sqs_summary["total_records"] += len(notifications["Records"])

for r in notifications["Records"]:
# we only include the s3 object key in the summary.
#
# Here is an example of a notification record:
#
# {
# "Records": [
# {
# "awsRegion": "eu-west-1",
# "eventName": "ObjectCreated:Put",
# "eventSource": "aws:s3",
# "eventVersion": "2.1",
# "s3": {
# "bucket": {
# "arn": "arn:aws:s3:::mbranca-esf-data",
# "name": "mbranca-esf-data"
# },
# "object": {
# "key": "AWSLogs/1234567890/CloudTrail-Digest/"
# }
# }
# }
# ]
# }

# We stop adding records to the summary once we reach
# the `max_records` limit.
if len(aws_sqs_summary[first_records_key]) == max_records:
break

# Add the s3 object key to the summary.
aws_sqs_summary[first_records_key].append(r.get("s3"))

# Update the summary with the new information.
summary["aws:sqs"] = aws_sqs_summary

except Exception as exc:
shared_logger.exception("error summarizing lambda event", exc_info=exc)
# We add an error message to the summary so users know if the summary
# is incomplete.
summary["error"] = str(exc)

return summary


def discover_integration_scope(s3_object_key: str) -> str:
if s3_object_key == "":
shared_logger.debug("s3 object key is empty, dataset set to `generic`")
Expand Down
98 changes: 96 additions & 2 deletions tests/handlers/aws/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ def test_get_trigger_type_and_config_source(self) -> None:
assert get_trigger_type_and_config_source(event=event) == ("cloudwatch-logs", CONFIG_FROM_S3FILE)

with self.subTest("no Records"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
with self.assertRaisesRegex(Exception, "Not supported trigger"):
event = {}

get_trigger_type_and_config_source(event=event)

with self.subTest("len(Records) < 1"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
with self.assertRaisesRegex(Exception, "Not supported trigger"):
event = {"Records": []}

get_trigger_type_and_config_source(event=event)
Expand Down Expand Up @@ -460,3 +460,97 @@ def test_without_variables(self) -> None:

with pytest.raises(ValueError):
get_lambda_region()


@pytest.mark.unit
class TestSummarizeLambdaEvent(TestCase):

max_records = 42

def test_with_single_s3_sqs_record(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/627286350134/CloudTrail-Digest/"}}}]}', # noqa: E501
"eventSource": "aws:sqs",
}
]
}

summary = summarize_lambda_event(event=event, max_records=self.max_records)

assert summary == {
"aws:sqs": {
"total_records": 1,
f"first_{self.max_records}_records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/627286350134/CloudTrail-Digest/"},
}
],
}
}

def test_with_multiple_s3_sqs_records(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/1.log"}}},{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/2.log"}}}]}', # noqa: E501
"eventSource": "aws:sqs",
}
]
}

with self.subTest("no limits"):
summary = summarize_lambda_event(event=event, max_records=self.max_records)

assert summary == {
"aws:sqs": {
"total_records": 2,
f"first_{self.max_records}_records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/1.log"},
},
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/2.log"},
},
],
}
}

with self.subTest("with limits"):
summary = summarize_lambda_event(event=event, max_records=1)

assert summary == {
"aws:sqs": {
"total_records": 2,
"first_1_records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/1.log"},
}
],
}
}

def test_with_invalid_s3_sqs_notification(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": "I am not a valid JSON string.",
"eventSource": "aws:sqs",
}
]
}

summary = summarize_lambda_event(event)

assert summary == {"error": "unexpected character: line 1 column 1 (char 0)"}
Loading