diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index 237782ed..d5225c65 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -229,11 +229,30 @@ For `elasticsearch` the following arguments are supported: * `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined. * `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined. * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. - * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. + * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. The elasticseach output will NOT forward retryable errors (connection failures, HTTP status code 429) to the dead letter index. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. * `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB). * `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate. +. Here is a sample error indexed in the dead letter index: ++ +[source, json] +---- +{ + "@timestamp": "2024-10-07T05:57:59.448925Z", + "message": "{\"hey\":{\"message\":\"hey there\"},\"_id\":\"e6542822-4583-438d-9b4d-1a3023b5eeb9\",\"_op_type\":\"create\",\"_index\":\"logs-succeed.pr793-default\"}", + "error": { + "message": "[1:30] failed to parse field [hey] of type [keyword] in document with id 'e6542822-4583-438d-9b4d-1a3023b5eeb9'. Preview of field's value: '{message=hey there}'", + "type": "document_parsing_exception" + }, + "http": { + "response": { + "status_code": 400 + } + } +} +---- + For `logstash` the following arguments are supported: * `args.logstash_url`: URL of {ls} endpoint in the format `http(s)://host:port` diff --git a/shippers/es.py b/shippers/es.py index 43d86e1b..68f7f59a 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -3,6 +3,7 @@ # you may not use this file except in compliance with the Elastic License 2.0. import datetime +import http import uuid from typing import Any, Dict, Optional, Union @@ -21,7 +22,10 @@ _EVENT_BUFFERED = "_EVENT_BUFFERED" _EVENT_SENT = "_EVENT_SENT" -_VERSION_CONFLICT = 409 +# List of HTTP status codes that are considered retryable +_retryable_http_status_codes = [ + http.HTTPStatus.TOO_MANY_REQUESTS, +] class JSONSerializer(Serializer): @@ -172,11 +176,13 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) - if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: + if "status" in error["create"] and error["create"]["status"] == http.HTTPStatus.CONFLICT: # Skip duplicate events on dead letter index and replay queue continue - failed.append({"error": error["create"]["error"], "action": action_failed[0]}) + failed_error = {"action": action_failed[0]} | self._parse_error(error["create"]) + + failed.append(failed_error) if len(failed) > 0: shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)}) @@ -185,6 +191,52 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio return failed + def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: + """ + Parses the error response from Elasticsearch and returns a + standardised error field. + + The error field is a dictionary with the following keys: + + - `message`: The error message + - `type`: The error type + + If the error is not recognised, the `message` key is set + to "Unknown error". + + It also sets the status code in the http field if it is present + as a number in the response. + """ + field: dict[str, Any] = {"error": {"message": "Unknown error", "type": "unknown"}} + + if "status" in error and isinstance(error["status"], int): + # Collecting the HTTP response status code in the + # error field, if present, and the type is an integer. + # + # Sometimes the status code is a string, for example, + # when the connection to the server fails. + field["http"] = {"response": {"status_code": error["status"]}} + + if "error" not in error: + return field + + if isinstance(error["error"], str): + # Can happen with connection errors. + field["error"]["message"] = error["error"] + if "exception" in error: + # The exception field is usually an Exception object, + # so we convert it to a string. + field["error"]["type"] = str(type(error["exception"])) + elif isinstance(error["error"], dict): + # Can happen with status 5xx errors. + # In this case, we look for the "reason" and "type" fields. + if "reason" in error["error"]: + field["error"]["message"] = error["error"]["reason"] + if "type" in error["error"]: + field["error"]["type"] = error["error"]["type"] + + return field + def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None: self._event_id_generator = event_id_generator @@ -243,26 +295,59 @@ def flush(self) -> None: return def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: + """ + Index the failed actions in the dead letter index (DLI). + + This function attempts to index failed actions to the DLI, but may not do so + for one of the following reasons: + + 1. The failed action could not be encoded for indexing in the DLI. + 2. ES returned an error on the attempt to index the failed action in the DLI. + 3. The failed action error is retryable (connection error or status code 429). + + Retryable errors are not indexed in the DLI, as they are expected to be + sent again to the data stream at `es_datastream_name` by the replay handler. + + Args: + actions (list[Any]): A list of actions to index in the DLI. + + Returns: + list[Any]: A list of actions that were not indexed in the DLI due to one of + the reasons mentioned above. + """ + non_indexed_actions: list[Any] = [] encoded_actions = [] - dead_letter_errors: list[Any] = [] + for action in actions: + if ( + "http" not in action # no http status: connection error + or action["http"]["response"]["status_code"] in _retryable_http_status_codes + ): + # We don't want to forward this action to + # the dead letter index. + # + # Add the action to the list of non-indexed + # actions and continue with the next one. + non_indexed_actions.append(action) + continue + # Reshape event to dead letter index encoded = self._encode_dead_letter(action) if not encoded: shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action}) - dead_letter_errors.append(action) + non_indexed_actions.append(action) encoded_actions.append(encoded) # If no action can be encoded, return original action list as failed if len(encoded_actions) == 0: - return dead_letter_errors + return non_indexed_actions errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) failed = self._handle_outcome(actions=encoded_actions, errors=errors) if not isinstance(failed, list) or len(failed) == 0: - return dead_letter_errors + return non_indexed_actions for action in failed: event_payload = self._decode_dead_letter(action) @@ -271,9 +356,9 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action}) continue - dead_letter_errors.append(event_payload) + non_indexed_actions.append(event_payload) - return dead_letter_errors + return non_indexed_actions def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in outcome or "error" not in outcome: @@ -281,15 +366,22 @@ def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: # Assign random id in case bulk() results in error, it can be matched to the original # action - return { + encoded = { "@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "_id": f"{uuid.uuid4()}", + "_id": str(uuid.uuid4()), "_index": self._es_dead_letter_index, "_op_type": "create", "message": json_dumper(outcome["action"]), "error": outcome["error"], } + if "http" in outcome: + # the `http.response.status_code` is not + # always present in the error field. + encoded["http"] = outcome["http"] + + return encoded + def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: return {} diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index fc15d227..c6654adb 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4292,10 +4292,11 @@ def test_es_dead_letter_index(self) -> None: assert res["hits"]["total"] == {"value": 1, "relation": "eq"} assert ( - res["hits"]["hits"][0]["_source"]["error"]["reason"] + res["hits"]["hits"][0]["_source"]["error"]["message"] == "test_es_non_indexable_dead_letter_index fail message" ) assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" + assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500 dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) assert dead_letter_message["log"]["offset"] == 0 assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path @@ -4419,3 +4420,116 @@ def test_es_non_indexable_dead_letter_index(self) -> None: assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + + def test_es_dead_letter_index_with_retryable_errors(self) -> None: + """ + Test that retryable errors are not redirected to the dead letter index (DLI). + """ + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + # This IP address is non-routable and + # will always result in a connection failure. + elasticsearch_url: "0.0.0.0:9200" + es_dead_letter_index: "{dead_letter_index_name}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_dead_letter_index_with_retryable_errors fail message", + } + }, + ] + } + + self.elasticsearch.put_pipeline( + id="test_es_dead_letter_index_with_retryable_errors_fail_pipeline", + body=processors, + ) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_dead_letter_index_with_retryable_errors_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test event does not go into the dead letter queue + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 + + # Test event has been redirected into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 1 + + first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) + + assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") + assert first_body["event_payload"]["log"]["offset"] == 0 + assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path + assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name + assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id + assert first_body["event_payload"]["cloud"]["provider"] == "aws" + assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" + assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" + assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] diff --git a/tests/shippers/test_es.py b/tests/shippers/test_es.py index aab8c345..dcf774ca 100644 --- a/tests/shippers/test_es.py +++ b/tests/shippers/test_es.py @@ -511,3 +511,50 @@ def test_dumps(self) -> None: with self.subTest("dumps dict"): dumped = json_serializer.dumps({"key": "value"}) assert '{"key":"value"}' == dumped + + +@pytest.mark.unit +class TestParseError(TestCase): + + def test_parse_error(self) -> None: + shipper = ElasticsearchShipper( + elasticsearch_url="elasticsearch_url", + username="username", + password="password", + tags=["tag1", "tag2", "tag3"], + ) + + with self.subTest("fail_processor_exception"): + error = shipper._parse_error( + { + "status": 500, + "error": { + "type": "fail_processor_exception", + "reason": "Fail message", + }, + }, + ) + + assert error["error"]["type"] == "fail_processor_exception" + assert error["error"]["message"] == "Fail message" + assert error["http"]["response"]["status_code"] == 500 + + with self.subTest("connection_error"): + error = shipper._parse_error( + { + "status": "N/A", + "error": "whatever", + "exception": elasticsearch.exceptions.ConnectionError("Connection error"), + } + ) + + assert error["error"]["type"] == "" + assert error["error"]["message"] == "whatever" + assert "http" not in error + + with self.subTest("unknown_error"): + error = shipper._parse_error({}) + + assert error["error"]["type"] == "unknown" + assert error["error"]["message"] == "Unknown error" + assert "http" not in error