From d1c45470bdff60d8a3737556ae33c6c418b1b49b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constan=C3=A7a=20Manteigas?= <113898685+constanca-m@users.noreply.github.com> Date: Tue, 14 May 2024 17:56:14 +0200 Subject: [PATCH] Revert "Place messages from misconfigured IDs in the replay queue (#711)" This reverts commit 45e6c3586edd0b0513b79f355a26bc18cb00ed9b. --- CHANGELOG.md | 4 - handlers/aws/cloudwatch_logs_trigger.py | 47 +++--- handlers/aws/handler.py | 133 ++++------------ handlers/aws/kinesis_trigger.py | 58 +++---- handlers/aws/s3_sqs_trigger.py | 59 +++---- handlers/aws/sqs_trigger.py | 57 +++---- handlers/aws/utils.py | 5 +- how-to-test-locally/.env | 11 -- how-to-test-locally/README.md | 69 -------- how-to-test-locally/Taskfile.yaml | 49 ------ share/version.py | 2 +- tests/handlers/aws/test_handler.py | 16 +- tests/handlers/aws/test_integrations.py | 200 ------------------------ 13 files changed, 128 insertions(+), 582 deletions(-) delete mode 100644 how-to-test-locally/.env delete mode 100644 how-to-test-locally/README.md delete mode 100644 how-to-test-locally/Taskfile.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 02c81b06..3f3ca873 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,3 @@ -### v1.14.0 - 2024/05/07 -##### Bug fixes -* Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711). - ### v1.13.1 - 2024/03/07 ##### Features * Add documentation and optimise performance for `root_fields_to_add_to_expanded_event` [#642](https://github.com/elastic/elastic-serverless-forwarder/pull/642) diff --git a/handlers/aws/cloudwatch_logs_trigger.py b/handlers/aws/cloudwatch_logs_trigger.py index 129978cb..b90eb0b1 100644 --- a/handlers/aws/cloudwatch_logs_trigger.py +++ b/handlers/aws/cloudwatch_logs_trigger.py @@ -22,16 +22,15 @@ def _from_awslogs_data_to_event(awslogs_data: str) -> Any: return json_parser(cloudwatch_logs_payload_plain) -def _handle_cloudwatch_logs_move( +def _handle_cloudwatch_logs_continuation( sqs_client: BotoBaseClient, - sqs_destination_queue: str, + sqs_continuing_queue: str, + last_ending_offset: Optional[int], + last_event_expanded_offset: Optional[int], cloudwatch_logs_event: dict[str, Any], - input_id: str, + current_log_event: int, + event_input_id: str, config_yaml: str, - continuing_queue: bool = True, - current_log_event: int = 0, - last_ending_offset: Optional[int] = None, - last_event_expanded_offset: Optional[int] = None, ) -> None: """ Handler of the continuation queue for cloudwatch logs inputs @@ -52,7 +51,7 @@ def _handle_cloudwatch_logs_move( message_attributes = { "config": {"StringValue": config_yaml, "DataType": "String"}, "originalEventId": {"StringValue": log_event["id"], "DataType": "String"}, - "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, + "originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"}, "originalLogGroup": {"StringValue": log_group_name, "DataType": "String"}, "originalLogStream": {"StringValue": log_stream_name, "DataType": "String"}, "originalEventTimestamp": {"StringValue": str(log_event["timestamp"]), "DataType": "Number"}, @@ -71,31 +70,21 @@ def _handle_cloudwatch_logs_move( } sqs_client.send_message( - QueueUrl=sqs_destination_queue, + QueueUrl=sqs_continuing_queue, MessageBody=log_event["message"], MessageAttributes=message_attributes, ) - if continuing_queue: - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_destination_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "event_id": log_event["id"], - "event_timestamp": log_event["timestamp"], - }, - ) - else: - shared_logger.debug( - "replaying", - extra={ - "sqs_replaying_queue": sqs_destination_queue, - "event_id": log_event["id"], - "event_timestamp": log_event["timestamp"], - }, - ) + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_continuing_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "event_id": log_event["id"], + "event_timestamp": log_event["timestamp"], + }, + ) def _handle_cloudwatch_logs_event( diff --git a/handlers/aws/handler.py b/handlers/aws/handler.py index 913a6469..626fc8ad 100644 --- a/handlers/aws/handler.py +++ b/handlers/aws/handler.py @@ -13,13 +13,13 @@ from .cloudwatch_logs_trigger import ( _from_awslogs_data_to_event, + _handle_cloudwatch_logs_continuation, _handle_cloudwatch_logs_event, - _handle_cloudwatch_logs_move, ) -from .kinesis_trigger import _handle_kinesis_move, _handle_kinesis_record +from .kinesis_trigger import _handle_kinesis_continuation, _handle_kinesis_record from .replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event -from .s3_sqs_trigger import _handle_s3_sqs_event, _handle_s3_sqs_move -from .sqs_trigger import _handle_sqs_event, handle_sqs_move +from .s3_sqs_trigger import _handle_s3_sqs_continuation, _handle_s3_sqs_event +from .sqs_trigger import _handle_sqs_continuation, _handle_sqs_event from .utils import ( CONFIG_FROM_PAYLOAD, INTEGRATION_SCOPE_GENERIC, @@ -130,10 +130,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex sent_events: int = 0 empty_events: int = 0 skipped_events: int = 0 - error_events: int = 0 - - sqs_replaying_queue = os.environ["SQS_REPLAY_URL"] - sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] if trigger_type == "cloudwatch-logs": cloudwatch_logs_event = _from_awslogs_data_to_event(lambda_event["awslogs"]["data"]) @@ -148,25 +144,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex ) if event_input is None: - shared_logger.error("no input defined", extra={"input_id": input_id}) - error_events += 1 - _handle_cloudwatch_logs_move( - sqs_client=sqs_client, - sqs_destination_queue=sqs_replaying_queue, - cloudwatch_logs_event=cloudwatch_logs_event, - input_id=input_id, - config_yaml=config_yaml, - continuing_queue=False, - ) - shared_logger.info( - "lambda is going to shutdown", - extra={ - "error_events": error_events, - "sent_events": sent_events, - "empty_events": empty_events, - "skipped_events": skipped_events, - }, - ) + shared_logger.warning("no input defined", extra={"input_type": trigger_type, "input_id": input_id}) + return "completed" aws_region = input_id.split(":")[3] @@ -201,6 +180,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex empty_events += 1 if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period: + sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] + shared_logger.info( "lambda is going to shutdown, continuing on dedicated sqs queue", extra={ @@ -213,14 +194,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex composite_shipper.flush() - _handle_cloudwatch_logs_move( + _handle_cloudwatch_logs_continuation( sqs_client=sqs_client, - sqs_destination_queue=sqs_continuing_queue, + sqs_continuing_queue=sqs_continuing_queue, last_ending_offset=last_ending_offset, last_event_expanded_offset=last_event_expanded_offset, cloudwatch_logs_event=cloudwatch_logs_event, current_log_event=current_log_event_n, - input_id=input_id, + event_input_id=input_id, config_yaml=config_yaml, ) @@ -229,12 +210,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex composite_shipper.flush() shared_logger.info( "lambda processed all the events", - extra={ - "sent_events": sent_events, - "empty_events": empty_events, - "skipped_events": skipped_events, - "error_events": error_events, - }, + extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, ) if trigger_type == "kinesis-data-stream": @@ -242,30 +218,9 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex input_id = lambda_event["Records"][0]["eventSourceARN"] event_input = config.get_input_by_id(input_id) - if event_input is None: - shared_logger.error("no input defined", extra={"input_id": input_id}) - error_events += len(lambda_event["Records"]) - - for kinesis_record in lambda_event["Records"]: - _handle_kinesis_move( - sqs_client=sqs_client, - sqs_destination_queue=sqs_replaying_queue, - kinesis_record=kinesis_record, - event_input_id=input_id, - config_yaml=config_yaml, - continuing_queue=False, - ) + shared_logger.warning("no input defined", extra={"input_id": input_id}) - shared_logger.info( - "lambda is going to shutdown", - extra={ - "sent_events": sent_events, - "empty_events": empty_events, - "skipped_events": skipped_events, - "error_events": error_events, - }, - ) return "completed" composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) @@ -298,6 +253,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex empty_events += 1 if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period: + sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] + shared_logger.info( "lambda is going to shutdown, continuing on dedicated sqs queue", extra={ @@ -305,7 +262,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex "sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events, - "error_events": error_events, }, ) @@ -319,9 +275,9 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex continuing_last_ending_offset = None continuing_last_event_expanded_offset = None - _handle_kinesis_move( + _handle_kinesis_continuation( sqs_client=sqs_client, - sqs_destination_queue=sqs_continuing_queue, + sqs_continuing_queue=sqs_continuing_queue, last_ending_offset=continuing_last_ending_offset, last_event_expanded_offset=continuing_last_event_expanded_offset, kinesis_record=kinesis_record, @@ -334,12 +290,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex composite_shipper.flush() shared_logger.info( "lambda processed all the events", - extra={ - "sent_events": sent_events, - "empty_events": empty_events, - "skipped_events": skipped_events, - "error_events": error_events, - }, + extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, ) if trigger_type == "s3-sqs" or trigger_type == "sqs": @@ -367,10 +318,12 @@ def handle_timeout( timeout_config_yaml: str, timeout_current_s3_record: int = 0, ) -> None: + timeout_sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] + shared_logger.info( "lambda is going to shutdown, continuing on dedicated sqs queue", extra={ - "sqs_queue": sqs_continuing_queue, + "sqs_queue": timeout_sqs_continuing_queue, "sent_events": timeout_sent_events, "empty_events": timeout_empty_events, "skipped_events": timeout_skipped_events, @@ -396,24 +349,24 @@ def handle_timeout( continue if timeout_input.type == "s3-sqs": - _handle_s3_sqs_move( + _handle_s3_sqs_continuation( sqs_client=sqs_client, - sqs_destination_queue=sqs_continuing_queue, + sqs_continuing_queue=timeout_sqs_continuing_queue, last_ending_offset=timeout_last_ending_offset, last_event_expanded_offset=timeout_last_event_expanded_offset, sqs_record=timeout_sqs_record, current_s3_record=timeout_current_s3_record, - input_id=timeout_input_id, + event_input_id=timeout_input_id, config_yaml=timeout_config_yaml, ) else: - handle_sqs_move( + _handle_sqs_continuation( sqs_client=sqs_client, - sqs_destination_queue=sqs_continuing_queue, + sqs_continuing_queue=timeout_sqs_continuing_queue, last_ending_offset=timeout_last_ending_offset, last_event_expanded_offset=timeout_last_event_expanded_offset, sqs_record=timeout_sqs_record, - input_id=timeout_input_id, + event_input_id=timeout_input_id, config_yaml=timeout_config_yaml, ) @@ -429,36 +382,15 @@ def handle_timeout( input_id = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"] event_input = config.get_input_by_id(input_id) - if event_input is None: - # This could happen if aws_lambda_event_source_mapping is set correctly, but - # the id on the config.yaml was writen incorrectly. - shared_logger.error("no input defined", extra={"input_id": input_id}) - if trigger_type == "s3-sqs": - _handle_s3_sqs_move( - sqs_client=sqs_client, - sqs_destination_queue=sqs_replaying_queue, - sqs_record=sqs_record, - input_id=input_id, - config_yaml=config_yaml, - continuing_queue=False, - ) - elif trigger_type == "sqs": - handle_sqs_move( - sqs_client=sqs_client, - sqs_destination_queue=sqs_replaying_queue, - sqs_record=sqs_record, - input_id=input_id, - config_yaml=config_yaml, - continuing_queue=False, - ) - error_events += 1 + shared_logger.warning("no input defined", extra={"input_id": input_id}) continue if input_id in composite_shipper_cache: composite_shipper = composite_shipper_cache[input_id] else: composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) + composite_shipper_cache[event_input.id] = composite_shipper continuing_event_expanded_offset: Optional[int] = None @@ -561,12 +493,7 @@ def handle_timeout( shared_logger.info( "lambda processed all the events", - extra={ - "sent_events": sent_events, - "empty_events": empty_events, - "skipped_events": skipped_events, - "error_events": error_events, - }, + extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, ) return "completed" diff --git a/handlers/aws/kinesis_trigger.py b/handlers/aws/kinesis_trigger.py index dac715ec..f9627dd7 100644 --- a/handlers/aws/kinesis_trigger.py +++ b/handlers/aws/kinesis_trigger.py @@ -13,26 +13,21 @@ from .utils import get_account_id_from_arn, get_kinesis_stream_name_type_and_region_from_arn -def _handle_kinesis_move( +def _handle_kinesis_continuation( sqs_client: BotoBaseClient, - sqs_destination_queue: str, + sqs_continuing_queue: str, + last_ending_offset: Optional[int], + last_event_expanded_offset: Optional[int], kinesis_record: dict[str, Any], event_input_id: str, config_yaml: str, - continuing_queue: bool = True, - last_ending_offset: Optional[int] = None, - last_event_expanded_offset: Optional[int] = None, ) -> None: """ - Handler of the continuation/replay queue for kinesis data stream inputs. - If a kinesis data stream records batch cannot be fully processed before the timeout of the lambda, the handler will - be called for the continuation queue: it will send new sqs messages for the unprocessed records to the - internal continuing sqs queue. - If a sqs message has an eventSourceARN not present in the config.yaml ids, then the handler should be called, - so it can get placed in the internal replay queue. - - :param continuing_queue: should be set to true if the sqs message is going to be placed in the continuing - queue. Otherwise, we assume it will be placed in the replaying queue, and, in that case, it should be set to false. + Handler of the continuation queue for kinesis data stream inputs + If a kinesis data stream records batch cannot be fully processed before the + timeout of the lambda this handler will be called: it will + send new sqs messages for the unprocessed records in the batch to the + internal continuing sqs queue """ sequence_number = kinesis_record["kinesis"]["sequenceNumber"] @@ -65,33 +60,22 @@ def _handle_kinesis_move( kinesis_data: str = kinesis_record["kinesis"]["data"] sqs_client.send_message( - QueueUrl=sqs_destination_queue, + QueueUrl=sqs_continuing_queue, MessageBody=kinesis_data, MessageAttributes=message_attributes, ) - if continuing_queue: - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_destination_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "partition_key": partition_key, - "approximate_arrival_timestamp": approximate_arrival_timestamp, - "sequence_number": sequence_number, - }, - ) - else: - shared_logger.debug( - "replaying", - extra={ - "sqs_replaying_queue": sqs_destination_queue, - "partition_key": partition_key, - "approximate_arrival_timestamp": approximate_arrival_timestamp, - "sequence_number": sequence_number, - }, - ) + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_continuing_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "partition_key": partition_key, + "approximate_arrival_timestamp": approximate_arrival_timestamp, + "sequence_number": sequence_number, + }, + ) def _handle_kinesis_record( diff --git a/handlers/aws/s3_sqs_trigger.py b/handlers/aws/s3_sqs_trigger.py index c23c4296..c981efe1 100644 --- a/handlers/aws/s3_sqs_trigger.py +++ b/handlers/aws/s3_sqs_trigger.py @@ -20,27 +20,22 @@ ) -def _handle_s3_sqs_move( +def _handle_s3_sqs_continuation( sqs_client: BotoBaseClient, - sqs_destination_queue: str, + sqs_continuing_queue: str, + last_ending_offset: Optional[int], + last_event_expanded_offset: Optional[int], sqs_record: dict[str, Any], - input_id: str, + current_s3_record: int, + event_input_id: str, config_yaml: str, - current_s3_record: int = 0, - continuing_queue: bool = True, - last_ending_offset: Optional[int] = None, - last_event_expanded_offset: Optional[int] = None, ) -> None: """ - Handler of the continuation/replay queue for s3-sqs inputs. - If a sqs message cannot be fully processed before the timeout of the lambda, the handler will be called - for the continuation queue: it will send new sqs messages for the unprocessed records to the - internal continuing sqs queue. - If a sqs message has an eventSourceARN not present in the config.yaml ids, then the handler should be called, - so it can get placed in the internal replay queue. - - :param continuing_queue: should be set to true if the sqs message is going to be placed in the continuing - queue. Otherwise, we assume it will be placed in the replaying queue, and, in that case, it should be set to false. + Handler of the continuation queue for s3-sqs inputs + If a sqs message cannot be fully processed before the + timeout of the lambda this handler will be called: it will + send new sqs messages for the unprocessed records to the + internal continuing sqs queue """ body = json_parser(sqs_record["body"]) @@ -56,33 +51,23 @@ def _handle_s3_sqs_move( sqs_record["body"] = json_dumper(body) sqs_client.send_message( - QueueUrl=sqs_destination_queue, + QueueUrl=sqs_continuing_queue, MessageBody=sqs_record["body"], MessageAttributes={ "config": {"StringValue": config_yaml, "DataType": "String"}, - "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, + "originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"}, }, ) - if continuing_queue: - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_destination_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "current_s3_record": current_s3_record, - }, - ) - else: - shared_logger.debug( - "replaying", - extra={ - "sqs_replaying_queue": sqs_destination_queue, - "input_id": input_id, - "message_id": sqs_record["messageId"], - }, - ) + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_continuing_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "current_s3_record": current_s3_record, + }, + ) def _handle_s3_sqs_event( diff --git a/handlers/aws/sqs_trigger.py b/handlers/aws/sqs_trigger.py index fdb4faf9..c8fbb01f 100644 --- a/handlers/aws/sqs_trigger.py +++ b/handlers/aws/sqs_trigger.py @@ -13,26 +13,21 @@ from .utils import get_account_id_from_arn, get_queue_url_from_sqs_arn, get_sqs_queue_name_and_region_from_arn -def handle_sqs_move( +def _handle_sqs_continuation( sqs_client: BotoBaseClient, - sqs_destination_queue: str, + sqs_continuing_queue: str, + last_ending_offset: Optional[int], + last_event_expanded_offset: Optional[int], sqs_record: dict[str, Any], - input_id: str, + event_input_id: str, config_yaml: str, - continuing_queue: bool = True, - last_ending_offset: Optional[int] = None, - last_event_expanded_offset: Optional[int] = None, ) -> None: """ - Handler of the continuation/replay queue for sqs inputs. - If a sqs message cannot be fully processed before the timeout of the lambda, the handler will be called - for the continuation queue: it will send new sqs messages for the unprocessed records to the - internal continuing sqs queue. - If a sqs message has an eventSourceARN not present in the config.yaml ids, then the handler should be called, - so it can get placed in the internal replay queue. - - :param continuing_queue: should be set to true if the sqs message is going to be placed in the continuing - queue. Otherwise, we assume it will be placed in the replaying queue, and, in that case, it should be set to false. + Handler of the continuation queue for sqs inputs + If a sqs message cannot be fully processed before the + timeout of the lambda this handler will be called: it will + send new sqs messages for the unprocessed records to the + internal continuing sqs queue """ message_attributes = {} @@ -53,7 +48,7 @@ def handle_sqs_move( "StringValue": str(sqs_record["attributes"]["SentTimestamp"]), "DataType": "Number", }, - "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, + "originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"}, } if last_ending_offset is not None: @@ -66,30 +61,20 @@ def handle_sqs_move( } sqs_client.send_message( - QueueUrl=sqs_destination_queue, + QueueUrl=sqs_continuing_queue, MessageBody=sqs_record["body"], MessageAttributes=message_attributes, ) - if continuing_queue: - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_destination_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "message_id": sqs_record["messageId"], - }, - ) - else: - shared_logger.debug( - "replaying", - extra={ - "sqs_replaying_queue": sqs_destination_queue, - "input_id": input_id, - "message_id": sqs_record["messageId"], - }, - ) + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_continuing_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "message_id": sqs_record["messageId"], + }, + ) def _handle_sqs_event( diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index f952ef53..4f0cf9b7 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -397,9 +397,6 @@ def get_input_from_log_group_subscription_data( all_regions = get_ec2_client().describe_regions(AllRegions=True) assert "Regions" in all_regions for region_data in all_regions["Regions"]: - - # arn:aws:logs:region:account-id:log-group:log_group_name:* - region = region_data["RegionName"] aws_or_gov = "aws" @@ -421,7 +418,7 @@ def get_input_from_log_group_subscription_data( if event_input is not None: return log_group_arn, event_input - return f"arn:aws:logs:%AWS_REGION%:{account_id}:log-group:{log_group_name}:*", None + return "", None def delete_sqs_record(sqs_arn: str, receipt_handle: str) -> None: diff --git a/how-to-test-locally/.env b/how-to-test-locally/.env deleted file mode 100644 index f9ab295d..00000000 --- a/how-to-test-locally/.env +++ /dev/null @@ -1,11 +0,0 @@ -# List of requirement files. -# Split them with , and without space, like this: example1.txt,example2.txt -REQUIREMENTS=requirements.txt - -# List of python files/directories to add to the zip file. -# Split them with , and without space, like this: example1.txt,example2.txt -DEPENDENCIES=main_aws.py,handlers,share,storage,shippers - -# Zip filename -FILENAME=local_esf.zip - diff --git a/how-to-test-locally/README.md b/how-to-test-locally/README.md deleted file mode 100644 index f693886d..00000000 --- a/how-to-test-locally/README.md +++ /dev/null @@ -1,69 +0,0 @@ -This is just an example of how to build and run ESF locally. - -## Requirements - -- [Terraform](https://www.terraform.io/) -- (Optional) [Taskfile](https://taskfile.dev/installation/) - - -## Steps - -**Important note**: ESF dependencies have been tested on architecture `x86_64`. Make sure to use it as well. - -### Step 1: Build your dependencies zip file - -You can build your own, or you can choose to run: -```bash -task -``` -To build it automatically. - -You can update the task variables in the `.env` file: -- The list of python dependencies, `DEPENDENCIES`. -- The list of python requirement files, `REQUIREMENTS`. -- The name of the zip file, `FILENAME`. - - -### Step 2: Run ESF terraform - -Use the code in [ESF terraform repository](https://github.com/elastic/terraform-elastic-esf). - -> **NOTE**: ESF lambda function is using architecture `x86_64`. - - -Place your `local_esf.zip` (or `` if you changed the value) in the same directory as ESF terraform. - -Go to `esf.tf` file and edit: - -```terraform -locals { - ... - dependencies-file = "local_esf.zip" # value of FILENAME in .env - ... -} -``` - -Remove/comment these lines from `esf.tf` file: - -```terraform -#resource "terraform_data" "curl-dependencies-zip" { -# provisioner "local-exec" { -# command = "curl -L -O ${local.dependencies-bucket-url}/${local.dependencies-file}" -# } -#} -``` - -And fix the now missing dependency in `dependencies-file`: - -```terraform -resource "aws_s3_object" "dependencies-file" { - bucket = local.config-bucket-name - key = local.dependencies-file - source = local.dependencies-file - - depends_on = [aws_s3_bucket.esf-config-bucket] #, terraform_data.curl-dependencies-zip] -} -``` - -Now follow the README file from [ESF terraform repository](https://github.com/elastic/terraform-elastic-esf) on how to configure the remaining necessary variables. You will have to configure `release-version` variable, but it will not be relevant to this. You can set any value you want for it. - diff --git a/how-to-test-locally/Taskfile.yaml b/how-to-test-locally/Taskfile.yaml deleted file mode 100644 index 16c96bd1..00000000 --- a/how-to-test-locally/Taskfile.yaml +++ /dev/null @@ -1,49 +0,0 @@ -version: '3' - -env: - # Directory to place the dependencies - just internal to this taskfile - DIR: dependencies - -dotenv: ['.env'] - -tasks: - default: - cmds: - - task: install-requirements - - task: build-zip-file - - task: remove-dependencies-dir - - task: add-to-zip - - install-requirements: - desc: "Install requirements from $REQUIREMENTS." - internal: true - requires: - var: REQUIREMENTS - cmds: - - rm -rf $DIR - - for: - var: REQUIREMENTS - split: ',' - cmd: pip3.9 install -r ../{{ .ITEM }} -t $DIR - - build-zip-file: - desc: "Zip $DIR to build $FILENAME." - internal: true - cmds: - - rm -rf $FILENAME - - cd $DIR && zip -r ../$FILENAME . - - remove-dependencies-dir: - desc: "Delete $DIR." - internal: true - cmds: - - rm -rf $DIR - - add-to-zip: - desc: "Add $DEPENDENCIES to zip file." - internal: true - cmds: - - for: - var: DEPENDENCIES - split: ',' - cmd: zip -r $FILENAME ../{{ .ITEM }} diff --git a/share/version.py b/share/version.py index dcd8436d..db306ffb 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.14.0" +version = "1.13.1" diff --git a/tests/handlers/aws/test_handler.py b/tests/handlers/aws/test_handler.py index de9c83b7..6858d0bd 100644 --- a/tests/handlers/aws/test_handler.py +++ b/tests/handlers/aws/test_handler.py @@ -399,8 +399,6 @@ def test_lambda_handler_noop(self) -> None: with self.subTest("no originalEventSourceARN in messageAttributes"): ctx = ContextMock() os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" - os.environ["SQS_REPLAY_URL"] = "https://sqs.eu-central-1.amazonaws.com/123456789012/replay_queue" - os.environ["SQS_CONTINUE_URL"] = "https://sqs.eu-central-1.amazonaws.com/123456789012/continue_queue" lambda_event = deepcopy(_dummy_lambda_event) del lambda_event["Records"][0]["messageAttributes"]["originalEventSourceARN"] assert handler(lambda_event, ctx) == "completed" # type:ignore @@ -464,6 +462,20 @@ def test_lambda_handler_noop(self) -> None: del lambda_event["Records"][0]["messageAttributes"]["originalEventSourceARN"] assert handler(lambda_event, ctx) == "completed" # type:ignore + with self.subTest("no input defined for kinesis-data-stream"): + ctx = ContextMock() + os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" + lambda_event = { + "Records": [ + { + "eventSource": "aws:kinesis", + "kinesis": {"data": ""}, + "eventSourceARN": "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream", + } + ] + } + assert handler(lambda_event, ctx) == "completed" # type:ignore + with self.subTest("body is neither replay queue nor s3-sqs"): ctx = ContextMock() os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 0b75847a..bf2a632c 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -2602,82 +2602,6 @@ def test_cloudwatch_logs_stream_as_input_instead_of_group(self) -> None: assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - def test_cloudwatch_logs_no_input_defined(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - _load_file_fixture("cloudwatch-log-2.json"), - _load_file_fixture("cloudwatch-log-3.json"), - ] - - cloudwatch_group_name = _time_based_id(suffix="source-group") - cloudwatch_group = _logs_create_cloudwatch_logs_group(self.logs_client, group_name=cloudwatch_group_name) - - cloudwatch_stream_name = _time_based_id(suffix="source-stream") - _logs_create_cloudwatch_logs_stream( - self.logs_client, group_name=cloudwatch_group_name, stream_name=cloudwatch_stream_name - ) - - _logs_upload_event_to_cloudwatch_logs( - self.logs_client, - group_name=cloudwatch_group_name, - stream_name=cloudwatch_stream_name, - messages_body=fixtures, - ) - - cloudwatch_group_arn = cloudwatch_group["arn"] - cloudwatch_group_name = cloudwatch_group_name - cloudwatch_stream_name = cloudwatch_stream_name - - config_yaml: str = f""" - inputs: - - type: "cloudwatch-logs" - id: "misconfigured-cloudwatch-logs" - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_cloudwatch_logs, event_ids_cloudwatch_logs, _ = _logs_retrieve_event_from_cloudwatch_logs( - self.logs_client, cloudwatch_group_name, cloudwatch_stream_name - ) - - ctx = ContextMock() - first_call = handler(events_cloudwatch_logs, ctx) # type:ignore - - assert first_call == "completed" - - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - replayed_messages = replayed_events["Records"] - - assert len(replayed_messages) == 3 - - arn_components = cloudwatch_group_arn.split(":") - arn_components[3] = "%AWS_REGION%" - cloudwatch_group_arn = ":".join(arn_components) - - for message in replayed_messages: - assert message["messageAttributes"]["originalEventSourceARN"]["stringValue"] == cloudwatch_group_arn - def test_cloudwatch_logs_last_ending_offset_reset(self) -> None: assert isinstance(self.logstash, LogstashContainer) assert isinstance(self.localstack, LocalStackContainer) @@ -2905,65 +2829,6 @@ def test_cloudwatch_logs_last_event_expanded_offset_continue(self) -> None: assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - def test_kinesis_data_stream_no_input_defined(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - _load_file_fixture("cloudwatch-log-2.json"), - _load_file_fixture("cloudwatch-log-3.json"), - ] - - kinesis_stream_name = _time_based_id(suffix="source-kinesis") - kinesis_stream = _kinesis_create_stream(self.kinesis_client, kinesis_stream_name) - kinesis_stream_arn = kinesis_stream["StreamDescription"]["StreamARN"] - - _kinesis_put_records(self.kinesis_client, kinesis_stream_name, fixtures) - - config_yaml: str = f""" - inputs: - - type: "kinesis-data-stream" - id: "misconfigured-id" - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_kinesis, _ = _kinesis_retrieve_event_from_kinesis_stream( - self.kinesis_client, kinesis_stream_name, kinesis_stream_arn - ) - - ctx = ContextMock() - first_call = handler(events_kinesis, ctx) # type:ignore - - assert first_call == "completed" - - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - replayed_messages = replayed_events["Records"] - - assert len(replayed_messages) == 3 - - for message in replayed_messages: - assert kinesis_stream_arn == message["messageAttributes"]["originalEventSourceARN"]["stringValue"] - def test_kinesis_data_stream_last_ending_offset_reset(self) -> None: assert isinstance(self.logstash, LogstashContainer) assert isinstance(self.localstack, LocalStackContainer) @@ -3187,71 +3052,6 @@ def test_kinesis_data_stream_last_event_expanded_offset_continue(self) -> None: assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - def test_sqs_no_input_defined(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - _load_file_fixture("cloudwatch-log-2.json"), - _load_file_fixture("cloudwatch-log-3.json"), - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - - for fixture in fixtures: - _sqs_send_messages(self.sqs_client, sqs_queue_url, fixture) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "misconfigured-id" - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - messages_sqs = events_sqs["Records"] - - ctx = ContextMock() - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "completed" - - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - replayed_messages = replayed_events["Records"] - - assert len(messages_sqs) == 3 - assert len(replayed_messages) == 3 - for i, message in enumerate(replayed_messages): - assert ( - messages_sqs[i]["eventSourceARN"] - == message["messageAttributes"]["originalEventSourceARN"]["stringValue"] - ) - def test_sqs_last_ending_offset_reset(self) -> None: assert isinstance(self.logstash, LogstashContainer) assert isinstance(self.localstack, LocalStackContainer)