Skip to content

Commit

Permalink
Revert "Place messages from misconfigured IDs in the replay queue (#711
Browse files Browse the repository at this point in the history
…)"

This reverts commit 45e6c35.
  • Loading branch information
constanca-m authored May 14, 2024
1 parent acbe702 commit d1c4547
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 582 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
### v1.14.0 - 2024/05/07
##### Bug fixes
* Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711).

### v1.13.1 - 2024/03/07
##### Features
* Add documentation and optimise performance for `root_fields_to_add_to_expanded_event` [#642](https://github.com/elastic/elastic-serverless-forwarder/pull/642)
Expand Down
47 changes: 18 additions & 29 deletions handlers/aws/cloudwatch_logs_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ def _from_awslogs_data_to_event(awslogs_data: str) -> Any:
return json_parser(cloudwatch_logs_payload_plain)


def _handle_cloudwatch_logs_move(
def _handle_cloudwatch_logs_continuation(
sqs_client: BotoBaseClient,
sqs_destination_queue: str,
sqs_continuing_queue: str,
last_ending_offset: Optional[int],
last_event_expanded_offset: Optional[int],
cloudwatch_logs_event: dict[str, Any],
input_id: str,
current_log_event: int,
event_input_id: str,
config_yaml: str,
continuing_queue: bool = True,
current_log_event: int = 0,
last_ending_offset: Optional[int] = None,
last_event_expanded_offset: Optional[int] = None,
) -> None:
"""
Handler of the continuation queue for cloudwatch logs inputs
Expand All @@ -52,7 +51,7 @@ def _handle_cloudwatch_logs_move(
message_attributes = {
"config": {"StringValue": config_yaml, "DataType": "String"},
"originalEventId": {"StringValue": log_event["id"], "DataType": "String"},
"originalEventSourceARN": {"StringValue": input_id, "DataType": "String"},
"originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"},
"originalLogGroup": {"StringValue": log_group_name, "DataType": "String"},
"originalLogStream": {"StringValue": log_stream_name, "DataType": "String"},
"originalEventTimestamp": {"StringValue": str(log_event["timestamp"]), "DataType": "Number"},
Expand All @@ -71,31 +70,21 @@ def _handle_cloudwatch_logs_move(
}

sqs_client.send_message(
QueueUrl=sqs_destination_queue,
QueueUrl=sqs_continuing_queue,
MessageBody=log_event["message"],
MessageAttributes=message_attributes,
)

if continuing_queue:
shared_logger.debug(
"continuing",
extra={
"sqs_continuing_queue": sqs_destination_queue,
"last_ending_offset": last_ending_offset,
"last_event_expanded_offset": last_event_expanded_offset,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)
else:
shared_logger.debug(
"replaying",
extra={
"sqs_replaying_queue": sqs_destination_queue,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)
shared_logger.debug(
"continuing",
extra={
"sqs_continuing_queue": sqs_continuing_queue,
"last_ending_offset": last_ending_offset,
"last_event_expanded_offset": last_event_expanded_offset,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)


def _handle_cloudwatch_logs_event(
Expand Down
133 changes: 30 additions & 103 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

from .cloudwatch_logs_trigger import (
_from_awslogs_data_to_event,
_handle_cloudwatch_logs_continuation,
_handle_cloudwatch_logs_event,
_handle_cloudwatch_logs_move,
)
from .kinesis_trigger import _handle_kinesis_move, _handle_kinesis_record
from .kinesis_trigger import _handle_kinesis_continuation, _handle_kinesis_record
from .replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event
from .s3_sqs_trigger import _handle_s3_sqs_event, _handle_s3_sqs_move
from .sqs_trigger import _handle_sqs_event, handle_sqs_move
from .s3_sqs_trigger import _handle_s3_sqs_continuation, _handle_s3_sqs_event
from .sqs_trigger import _handle_sqs_continuation, _handle_sqs_event
from .utils import (
CONFIG_FROM_PAYLOAD,
INTEGRATION_SCOPE_GENERIC,
Expand Down Expand Up @@ -130,10 +130,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
sent_events: int = 0
empty_events: int = 0
skipped_events: int = 0
error_events: int = 0

sqs_replaying_queue = os.environ["SQS_REPLAY_URL"]
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

if trigger_type == "cloudwatch-logs":
cloudwatch_logs_event = _from_awslogs_data_to_event(lambda_event["awslogs"]["data"])
Expand All @@ -148,25 +144,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
)

if event_input is None:
shared_logger.error("no input defined", extra={"input_id": input_id})
error_events += 1
_handle_cloudwatch_logs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
cloudwatch_logs_event=cloudwatch_logs_event,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
shared_logger.info(
"lambda is going to shutdown",
extra={
"error_events": error_events,
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
},
)
shared_logger.warning("no input defined", extra={"input_type": trigger_type, "input_id": input_id})

return "completed"

aws_region = input_id.split(":")[3]
Expand Down Expand Up @@ -201,6 +180,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
empty_events += 1

if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
Expand All @@ -213,14 +194,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

composite_shipper.flush()

_handle_cloudwatch_logs_move(
_handle_cloudwatch_logs_continuation(
sqs_client=sqs_client,
sqs_destination_queue=sqs_continuing_queue,
sqs_continuing_queue=sqs_continuing_queue,
last_ending_offset=last_ending_offset,
last_event_expanded_offset=last_event_expanded_offset,
cloudwatch_logs_event=cloudwatch_logs_event,
current_log_event=current_log_event_n,
input_id=input_id,
event_input_id=input_id,
config_yaml=config_yaml,
)

Expand All @@ -229,43 +210,17 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
composite_shipper.flush()
shared_logger.info(
"lambda processed all the events",
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
)

if trigger_type == "kinesis-data-stream":
shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

input_id = lambda_event["Records"][0]["eventSourceARN"]
event_input = config.get_input_by_id(input_id)

if event_input is None:
shared_logger.error("no input defined", extra={"input_id": input_id})
error_events += len(lambda_event["Records"])

for kinesis_record in lambda_event["Records"]:
_handle_kinesis_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
kinesis_record=kinesis_record,
event_input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
shared_logger.warning("no input defined", extra={"input_id": input_id})

shared_logger.info(
"lambda is going to shutdown",
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)
return "completed"

composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)
Expand Down Expand Up @@ -298,14 +253,15 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
empty_events += 1

if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
"sqs_queue": sqs_continuing_queue,
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

Expand All @@ -319,9 +275,9 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
continuing_last_ending_offset = None
continuing_last_event_expanded_offset = None

_handle_kinesis_move(
_handle_kinesis_continuation(
sqs_client=sqs_client,
sqs_destination_queue=sqs_continuing_queue,
sqs_continuing_queue=sqs_continuing_queue,
last_ending_offset=continuing_last_ending_offset,
last_event_expanded_offset=continuing_last_event_expanded_offset,
kinesis_record=kinesis_record,
Expand All @@ -334,12 +290,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
composite_shipper.flush()
shared_logger.info(
"lambda processed all the events",
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
)

if trigger_type == "s3-sqs" or trigger_type == "sqs":
Expand Down Expand Up @@ -367,10 +318,12 @@ def handle_timeout(
timeout_config_yaml: str,
timeout_current_s3_record: int = 0,
) -> None:
timeout_sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
"sqs_queue": sqs_continuing_queue,
"sqs_queue": timeout_sqs_continuing_queue,
"sent_events": timeout_sent_events,
"empty_events": timeout_empty_events,
"skipped_events": timeout_skipped_events,
Expand All @@ -396,24 +349,24 @@ def handle_timeout(
continue

if timeout_input.type == "s3-sqs":
_handle_s3_sqs_move(
_handle_s3_sqs_continuation(
sqs_client=sqs_client,
sqs_destination_queue=sqs_continuing_queue,
sqs_continuing_queue=timeout_sqs_continuing_queue,
last_ending_offset=timeout_last_ending_offset,
last_event_expanded_offset=timeout_last_event_expanded_offset,
sqs_record=timeout_sqs_record,
current_s3_record=timeout_current_s3_record,
input_id=timeout_input_id,
event_input_id=timeout_input_id,
config_yaml=timeout_config_yaml,
)
else:
handle_sqs_move(
_handle_sqs_continuation(
sqs_client=sqs_client,
sqs_destination_queue=sqs_continuing_queue,
sqs_continuing_queue=timeout_sqs_continuing_queue,
last_ending_offset=timeout_last_ending_offset,
last_event_expanded_offset=timeout_last_event_expanded_offset,
sqs_record=timeout_sqs_record,
input_id=timeout_input_id,
event_input_id=timeout_input_id,
config_yaml=timeout_config_yaml,
)

Expand All @@ -429,36 +382,15 @@ def handle_timeout(
input_id = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

event_input = config.get_input_by_id(input_id)

if event_input is None:
# This could happen if aws_lambda_event_source_mapping is set correctly, but
# the id on the config.yaml was writen incorrectly.
shared_logger.error("no input defined", extra={"input_id": input_id})
if trigger_type == "s3-sqs":
_handle_s3_sqs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
sqs_record=sqs_record,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
elif trigger_type == "sqs":
handle_sqs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
sqs_record=sqs_record,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
error_events += 1
shared_logger.warning("no input defined", extra={"input_id": input_id})
continue

if input_id in composite_shipper_cache:
composite_shipper = composite_shipper_cache[input_id]
else:
composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)

composite_shipper_cache[event_input.id] = composite_shipper

continuing_event_expanded_offset: Optional[int] = None
Expand Down Expand Up @@ -561,12 +493,7 @@ def handle_timeout(

shared_logger.info(
"lambda processed all the events",
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
)

return "completed"
Loading

0 comments on commit d1c4547

Please sign in to comment.