Skip to content

Commit

Permalink
dead letter index: align error field to ECS and do not forward retrya…
Browse files Browse the repository at this point in the history
…ble errors (#793)

This PR brings the following changes:

- Align the `error` field in documents sent to the dead letter index (DLI) to the ECS format; the field now provides `error.message` and `error.type`.
- Add the `http.response.status_code` field 
- Limit the error type sent to the DLI; do not send Elasticsearch client errors that:
  - are connection errors (do not have an `http.response.status_code`)
  - have a retriable status code (for example, 429)

Here is a sample error document from a mapping conflict:

```json
{
  "@timestamp": "2024-10-07T05:57:59.448925Z",
  "message": "{\"hey\":{\"message\":\"hey there\"},\"_id\":\"e6542822-4583-438d-9b4d-1a3023b5eeb9\",\"_op_type\":\"create\",\"_index\":\"logs-succeed.pr793-default\"}",
  "error": {
    "message": "[1:30] failed to parse field [hey] of type [keyword] in document with id 'e6542822-4583-438d-9b4d-1a3023b5eeb9'. Preview of field's value: '{message=hey there}'",
    "type": "document_parsing_exception"
  },
  "http": {
    "response": {
      "status_code": 400
    }
  }
}
```

- The error field must always match the definition of the ECS error field.
- Avoid sending connection-related errors to DLI.
- Avoid sending errors with retryable status codes to DLI.
  • Loading branch information
kaiyan-sheng authored Oct 8, 2024
1 parent d39cad4 commit 96fb54e
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 13 deletions.
21 changes: 20 additions & 1 deletion docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,30 @@ For `elasticsearch` the following arguments are supported:
* `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined.
* `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined.
* `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**.
* `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error.
* `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. The elasticseach output will NOT forward retryable errors (connection failures, HTTP status code 429) to the dead letter index.
* `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500.
* `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB).
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate.

. Here is a sample error indexed in the dead letter index:
+
[source, json]
----
{
"@timestamp": "2024-10-07T05:57:59.448925Z",
"message": "{\"hey\":{\"message\":\"hey there\"},\"_id\":\"e6542822-4583-438d-9b4d-1a3023b5eeb9\",\"_op_type\":\"create\",\"_index\":\"logs-succeed.pr793-default\"}",
"error": {
"message": "[1:30] failed to parse field [hey] of type [keyword] in document with id 'e6542822-4583-438d-9b4d-1a3023b5eeb9'. Preview of field's value: '{message=hey there}'",
"type": "document_parsing_exception"
},
"http": {
"response": {
"status_code": 400
}
}
}
----

For `logstash` the following arguments are supported:

* `args.logstash_url`: URL of {ls} endpoint in the format `http(s)://host:port`
Expand Down
114 changes: 103 additions & 11 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# you may not use this file except in compliance with the Elastic License 2.0.

import datetime
import http
import uuid
from typing import Any, Dict, Optional, Union

Expand All @@ -21,7 +22,10 @@

_EVENT_BUFFERED = "_EVENT_BUFFERED"
_EVENT_SENT = "_EVENT_SENT"
_VERSION_CONFLICT = 409
# List of HTTP status codes that are considered retryable
_retryable_http_status_codes = [
http.HTTPStatus.TOO_MANY_REQUESTS,
]


class JSONSerializer(Serializer):
Expand Down Expand Up @@ -172,11 +176,13 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)

if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
if "status" in error["create"] and error["create"]["status"] == http.HTTPStatus.CONFLICT:
# Skip duplicate events on dead letter index and replay queue
continue

failed.append({"error": error["create"]["error"], "action": action_failed[0]})
failed_error = {"action": action_failed[0]} | self._parse_error(error["create"])

failed.append(failed_error)

if len(failed) > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
Expand All @@ -185,6 +191,52 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio

return failed

def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]:
"""
Parses the error response from Elasticsearch and returns a
standardised error field.
The error field is a dictionary with the following keys:
- `message`: The error message
- `type`: The error type
If the error is not recognised, the `message` key is set
to "Unknown error".
It also sets the status code in the http field if it is present
as a number in the response.
"""
field: dict[str, Any] = {"error": {"message": "Unknown error", "type": "unknown"}}

if "status" in error and isinstance(error["status"], int):
# Collecting the HTTP response status code in the
# error field, if present, and the type is an integer.
#
# Sometimes the status code is a string, for example,
# when the connection to the server fails.
field["http"] = {"response": {"status_code": error["status"]}}

if "error" not in error:
return field

if isinstance(error["error"], str):
# Can happen with connection errors.
field["error"]["message"] = error["error"]
if "exception" in error:
# The exception field is usually an Exception object,
# so we convert it to a string.
field["error"]["type"] = str(type(error["exception"]))
elif isinstance(error["error"], dict):
# Can happen with status 5xx errors.
# In this case, we look for the "reason" and "type" fields.
if "reason" in error["error"]:
field["error"]["message"] = error["error"]["reason"]
if "type" in error["error"]:
field["error"]["type"] = error["error"]["type"]

return field

def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None:
self._event_id_generator = event_id_generator

Expand Down Expand Up @@ -243,26 +295,59 @@ def flush(self) -> None:
return

def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
"""
Index the failed actions in the dead letter index (DLI).
This function attempts to index failed actions to the DLI, but may not do so
for one of the following reasons:
1. The failed action could not be encoded for indexing in the DLI.
2. ES returned an error on the attempt to index the failed action in the DLI.
3. The failed action error is retryable (connection error or status code 429).
Retryable errors are not indexed in the DLI, as they are expected to be
sent again to the data stream at `es_datastream_name` by the replay handler.
Args:
actions (list[Any]): A list of actions to index in the DLI.
Returns:
list[Any]: A list of actions that were not indexed in the DLI due to one of
the reasons mentioned above.
"""
non_indexed_actions: list[Any] = []
encoded_actions = []
dead_letter_errors: list[Any] = []

for action in actions:
if (
"http" not in action # no http status: connection error
or action["http"]["response"]["status_code"] in _retryable_http_status_codes
):
# We don't want to forward this action to
# the dead letter index.
#
# Add the action to the list of non-indexed
# actions and continue with the next one.
non_indexed_actions.append(action)
continue

# Reshape event to dead letter index
encoded = self._encode_dead_letter(action)
if not encoded:
shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action})
dead_letter_errors.append(action)
non_indexed_actions.append(action)

encoded_actions.append(encoded)

# If no action can be encoded, return original action list as failed
if len(encoded_actions) == 0:
return dead_letter_errors
return non_indexed_actions

errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=encoded_actions, errors=errors)

if not isinstance(failed, list) or len(failed) == 0:
return dead_letter_errors
return non_indexed_actions

for action in failed:
event_payload = self._decode_dead_letter(action)
Expand All @@ -271,25 +356,32 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action})
continue

dead_letter_errors.append(event_payload)
non_indexed_actions.append(event_payload)

return dead_letter_errors
return non_indexed_actions

def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in outcome or "error" not in outcome:
return {}

# Assign random id in case bulk() results in error, it can be matched to the original
# action
return {
encoded = {
"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"_id": f"{uuid.uuid4()}",
"_id": str(uuid.uuid4()),
"_index": self._es_dead_letter_index,
"_op_type": "create",
"message": json_dumper(outcome["action"]),
"error": outcome["error"],
}

if "http" in outcome:
# the `http.response.status_code` is not
# always present in the error field.
encoded["http"] = outcome["http"]

return encoded

def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]:
return {}
Expand Down
116 changes: 115 additions & 1 deletion tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4292,10 +4292,11 @@ def test_es_dead_letter_index(self) -> None:
assert res["hits"]["total"] == {"value": 1, "relation": "eq"}

assert (
res["hits"]["hits"][0]["_source"]["error"]["reason"]
res["hits"]["hits"][0]["_source"]["error"]["message"]
== "test_es_non_indexable_dead_letter_index fail message"
)
assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception"
assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500
dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"])
assert dead_letter_message["log"]["offset"] == 0
assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path
Expand Down Expand Up @@ -4419,3 +4420,116 @@ def test_es_non_indexable_dead_letter_index(self) -> None:
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]

def test_es_dead_letter_index_with_retryable_errors(self) -> None:
"""
Test that retryable errors are not redirected to the dead letter index (DLI).
"""
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.localstack, LocalStackContainer)

sqs_queue_name = _time_based_id(suffix="source-sqs")
sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())

dead_letter_index_name = "logs-generic-default-dli"

sqs_queue_arn = sqs_queue["QueueArn"]
sqs_queue_url = sqs_queue["QueueUrl"]
sqs_queue_url_path = sqs_queue["QueueUrlPath"]

config_yaml: str = f"""
inputs:
- type: sqs
id: "{sqs_queue_arn}"
tags: {self.default_tags}
outputs:
- type: "elasticsearch"
args:
# This IP address is non-routable and
# will always result in a connection failure.
elasticsearch_url: "0.0.0.0:9200"
es_dead_letter_index: "{dead_letter_index_name}"
ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
username: "{self.secret_arn}:username"
password: "{self.secret_arn}:password"
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"

fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
]

_sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))

event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)
message_id = event["Records"][0]["messageId"]

# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_es_dead_letter_index_with_retryable_errors fail message",
}
},
]
}

self.elasticsearch.put_pipeline(
id="test_es_dead_letter_index_with_retryable_errors_fail_pipeline",
body=processors,
)

self.elasticsearch.create_data_stream(name="logs-generic-default")
self.elasticsearch.put_settings(
index="logs-generic-default",
body={"index.default_pipeline": "test_es_dead_letter_index_with_retryable_errors_fail_pipeline"},
)

self.elasticsearch.refresh(index="logs-generic-default")

self.elasticsearch.create_data_stream(name=dead_letter_index_name)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

# Test document has been rejected from target index
self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0

# Test event does not go into the dead letter queue
assert self.elasticsearch.exists(index=dead_letter_index_name) is True

self.elasticsearch.refresh(index=dead_letter_index_name)

assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0

# Test event has been redirected into the replay queue
events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
assert len(events["Records"]) == 1

first_body: dict[str, Any] = json_parser(events["Records"][0]["body"])

assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n")
assert first_body["event_payload"]["log"]["offset"] == 0
assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path
assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name
assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id
assert first_body["event_payload"]["cloud"]["provider"] == "aws"
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]
Loading

0 comments on commit 96fb54e

Please sign in to comment.