diff --git a/README.md b/README.md index b5324e74..9c63112e 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ These libraries wrap the the core ElasticGraph libraries so that they can be dep graph LR; elasticgraph-admin_lambda --> rake & elasticgraph-admin & elasticgraph-lambda_support elasticgraph-graphql_lambda --> elasticgraph-graphql & elasticgraph-lambda_support - elasticgraph-indexer_autoscaler_lambda --> elasticgraph-datastore_core & elasticgraph-lambda_support & aws-sdk-lambda & aws-sdk-sqs & ox + elasticgraph-indexer_autoscaler_lambda --> elasticgraph-datastore_core & elasticgraph-lambda_support & aws-sdk-lambda & aws-sdk-sqs & aws-sdk-cloudwatch & ox elasticgraph-indexer_lambda --> elasticgraph-indexer & elasticgraph-lambda_support & aws-sdk-s3 & ox elasticgraph-lambda_support --> elasticgraph-opensearch & faraday_middleware-aws-sigv4 style elasticgraph-admin_lambda color: DodgerBlue; @@ -112,11 +112,13 @@ graph LR; style elasticgraph-datastore_core color: Green; style aws-sdk-lambda color: Red; style aws-sdk-sqs color: Red; + style aws-sdk-cloudwatch color: Red; style ox color: Red; style elasticgraph-indexer color: Green; style aws-sdk-s3 color: Red; style elasticgraph-opensearch color: Green; style faraday_middleware-aws-sigv4 color: Red; +click aws-sdk-cloudwatch href "https://rubygems.org/gems/aws-sdk-cloudwatch" click aws-sdk-lambda href "https://rubygems.org/gems/aws-sdk-lambda" click aws-sdk-s3 href "https://rubygems.org/gems/aws-sdk-s3" click aws-sdk-sqs href "https://rubygems.org/gems/aws-sdk-sqs" diff --git a/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec b/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec index 2d19185f..15aa7b03 100644 --- a/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec +++ b/elasticgraph-indexer_autoscaler_lambda/elasticgraph-indexer_autoscaler_lambda.gemspec @@ -16,6 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego spec.add_dependency "aws-sdk-lambda", "~> 1.125" spec.add_dependency "aws-sdk-sqs", "~> 1.80" + spec.add_dependency "aws-sdk-cloudwatch", "~> 1.104" # aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+ # we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb index 50bc9a58..02bc3ff2 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda.rb @@ -32,11 +32,13 @@ def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block) def initialize( datastore_core:, sqs_client: nil, - lambda_client: nil + lambda_client: nil, + cloudwatch_client: nil ) @datastore_core = datastore_core @sqs_client = sqs_client @lambda_client = lambda_client + @cloudwatch_client = cloudwatch_client end def sqs_client @@ -53,13 +55,21 @@ def lambda_client end end + def cloudwatch_client + @cloudwatch_client ||= begin + require "aws-sdk-cloudwatch" + Aws::CloudWatch::Client.new + end + end + def concurrency_scaler @concurrency_scaler ||= begin require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler" ConcurrencyScaler.new( datastore_core: @datastore_core, sqs_client: sqs_client, - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) end end diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb index ab90a8a5..bb8688fe 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler.rb @@ -12,16 +12,25 @@ module ElasticGraph class IndexerAutoscalerLambda # @private class ConcurrencyScaler - def initialize(datastore_core:, sqs_client:, lambda_client:) + def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:) @logger = datastore_core.logger @datastore_core = datastore_core @sqs_client = sqs_client @lambda_client = lambda_client + @cloudwatch_client = cloudwatch_client end MINIMUM_CONCURRENCY = 2 - def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:) + def tune_indexer_concurrency( + queue_urls:, + min_cpu_target:, + max_cpu_target:, + maximum_concurrency:, + required_free_storage_in_mb:, + indexer_function_name:, + cluster_name: + ) queue_attributes = get_queue_attributes(queue_urls) queue_arns = queue_attributes.fetch(:queue_arns) num_messages = queue_attributes.fetch(:total_messages) @@ -37,6 +46,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi new_target_concurrency = if num_messages.positive? + lowest_node_free_storage_in_mb = get_lowest_node_free_storage_in_mb(cluster_name) + cpu_utilization = get_max_cpu_utilization cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0 @@ -45,11 +56,19 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi if current_concurrency.nil? details_logger.log_unset nil + elsif lowest_node_free_storage_in_mb < required_free_storage_in_mb + details_logger.log_pause( + lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb, + required_free_storage_in_mb: required_free_storage_in_mb + ) + MINIMUM_CONCURRENCY elsif cpu_utilization < min_cpu_target increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5) (current_concurrency * increase_factor).round.tap do |new_concurrency| details_logger.log_increase( cpu_utilization: cpu_utilization, + lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb, + required_free_storage_in_mb: required_free_storage_in_mb, current_concurrency: current_concurrency, new_concurrency: new_concurrency ) @@ -59,6 +78,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi (current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency| details_logger.log_decrease( cpu_utilization: cpu_utilization, + lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb, + required_free_storage_in_mb: required_free_storage_in_mb, current_concurrency: current_concurrency, new_concurrency: new_concurrency ) @@ -66,13 +87,15 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi else details_logger.log_no_change( cpu_utilization: cpu_utilization, + lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb, + required_free_storage_in_mb: required_free_storage_in_mb, current_concurrency: current_concurrency ) current_concurrency end else details_logger.log_reset - 0 + MINIMUM_CONCURRENCY end if new_target_concurrency && new_target_concurrency != current_concurrency @@ -94,6 +117,22 @@ def get_max_cpu_utilization end.max.to_f end + def get_lowest_node_free_storage_in_mb(cluster_name) + metric_response = @cloudwatch_client.get_metric_data({ + start_time: ::Time.now - 1200, # past 20 minutes + end_time: ::Time.now, + metric_data_queries: [ + { + id: "minFreeStorageAcrossNodes", + expression: "SEARCH('{AWS/ES,ClientId,DomainName} MetricName=\"FreeStorageSpace\" AND DomainName=\"#{cluster_name}\"', 'Minimum', 60)", + return_data: true + } + ] + }) + + metric_response.metric_data_results.first.values.first + end + def get_queue_attributes(queue_urls) attributes_per_queue = queue_urls.map do |queue_url| @sqs_client.get_queue_attributes( diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb index 94936583..ae6c0371 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/details_logger.rb @@ -30,32 +30,46 @@ def initialize( } end - def log_increase(cpu_utilization:, current_concurrency:, new_concurrency:) + def log_increase(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:) log_result({ "action" => "increase", "cpu_utilization" => cpu_utilization, + "lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb, + "required_free_storage_in_mb" => required_free_storage_in_mb, "current_concurrency" => current_concurrency, "new_concurrency" => new_concurrency }) end - def log_decrease(cpu_utilization:, current_concurrency:, new_concurrency:) + def log_decrease(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:) log_result({ "action" => "decrease", "cpu_utilization" => cpu_utilization, + "lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb, + "required_free_storage_in_mb" => required_free_storage_in_mb, "current_concurrency" => current_concurrency, "new_concurrency" => new_concurrency }) end - def log_no_change(cpu_utilization:, current_concurrency:) + def log_no_change(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:) log_result({ "action" => "no_change", "cpu_utilization" => cpu_utilization, + "lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb, + "required_free_storage_in_mb" => required_free_storage_in_mb, "current_concurrency" => current_concurrency }) end + def log_pause(lowest_node_free_storage_in_mb:, required_free_storage_in_mb:) + log_result({ + "action" => "pause", + "lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb, + "required_free_storage_in_mb" => required_free_storage_in_mb + }) + end + def log_reset log_result({"action" => "reset"}) end diff --git a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb index f9d550f3..81ecae96 100644 --- a/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb +++ b/elasticgraph-indexer_autoscaler_lambda/lib/elastic_graph/indexer_autoscaler_lambda/lambda_function.rb @@ -26,7 +26,9 @@ def handle_request(event:, context:) min_cpu_target: event.fetch("min_cpu_target"), max_cpu_target: event.fetch("max_cpu_target"), maximum_concurrency: event.fetch("maximum_concurrency"), - indexer_function_name: event.fetch("indexer_function_name") + required_free_storage_in_mb: event.fetch("required_free_storage_in_mb"), + indexer_function_name: event.fetch("indexer_function_name"), + cluster_name: event.fetch("cluster_name") ) end end diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs index 29a25142..23a6ecbe 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/concurrency_scaler.rbs @@ -4,7 +4,8 @@ module ElasticGraph def initialize: ( datastore_core: DatastoreCore, sqs_client: Aws::SQS::Client, - lambda_client: Aws::Lambda::Client + lambda_client: Aws::Lambda::Client, + cloudwatch_client: Aws::CloudWatch::Client ) -> void MINIMUM_CONCURRENCY: ::Integer @@ -14,7 +15,9 @@ module ElasticGraph min_cpu_target: ::Integer, max_cpu_target: ::Integer, maximum_concurrency: ::Integer, - indexer_function_name: ::String + required_free_storage_in_mb: ::Integer, + indexer_function_name: ::String, + cluster_name: ::String ) -> void private @@ -23,8 +26,10 @@ module ElasticGraph @datastore_core: DatastoreCore @sqs_client: Aws::SQS::Client @lambda_client: Aws::Lambda::Client + @cloudwatch_client: Aws::CloudWatch::Client def get_max_cpu_utilization: () -> ::Float + def get_lowest_node_free_storage_in_mb: (::String) -> ::Float def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] } def get_concurrency: (::String) -> ::Integer? diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs index b14e081a..855cd38a 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/idexer_autoscaler_lambda/details_logger.rbs @@ -7,26 +7,37 @@ module ElasticGraph queue_urls: ::Array[::String], min_cpu_target: ::Integer, max_cpu_target: ::Integer, - num_messages: ::Integer, + num_messages: ::Integer ) -> void def log_increase: ( cpu_utilization: ::Float, + lowest_node_free_storage_in_mb: ::Float, + required_free_storage_in_mb: ::Integer, current_concurrency: ::Integer, new_concurrency: ::Integer ) -> void def log_decrease: ( cpu_utilization: ::Float, + lowest_node_free_storage_in_mb: ::Float, + required_free_storage_in_mb: ::Integer, current_concurrency: ::Integer, new_concurrency: ::Integer ) -> void def log_no_change: ( cpu_utilization: ::Float, + lowest_node_free_storage_in_mb: ::Float, + required_free_storage_in_mb: ::Integer, current_concurrency: ::Integer ) -> void + def log_pause: ( + lowest_node_free_storage_in_mb: ::Float, + required_free_storage_in_mb: ::Integer + ) -> void + def log_reset: () -> void def log_unset: () -> void diff --git a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs index 0a5260ae..c484a597 100644 --- a/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs +++ b/elasticgraph-indexer_autoscaler_lambda/sig/elastic_graph/indexer_autoscaler_lambda.rbs @@ -11,6 +11,7 @@ module ElasticGraph datastore_core: DatastoreCore, ?sqs_client: Aws::SQS::Client?, ?lambda_client: Aws::Lambda::Client?, + ?cloudwatch_client: Aws::CloudWatch::Client?, ) -> void @sqs_client: Aws::SQS::Client? @@ -19,6 +20,9 @@ module ElasticGraph @lambda_client: Aws::Lambda::Client? def lambda_client: () -> Aws::Lambda::Client + @cloudwatch_client: Aws::CloudWatch::Client? + def cloudwatch_client: () -> Aws::CloudWatch::Client + @concurrency_scaler: ConcurrencyScaler? def concurrency_scaler: () -> ConcurrencyScaler end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb b/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb index 11cf7e90..da3657aa 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/support/builds_indexer_autoscaler.rb @@ -16,6 +16,7 @@ module BuildsIndexerAutoscalerLambda def build_indexer_autoscaler( sqs_client: nil, lambda_client: nil, + cloudwatch_client: nil, **datastore_core_options, &customize_datastore_config ) @@ -28,6 +29,7 @@ def build_indexer_autoscaler( IndexerAutoscalerLambda.new( sqs_client: sqs_client, lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client, datastore_core: datastore_core ) end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb index 1e949061..2e39dcc4 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/concurrency_scaler_spec.rb @@ -8,6 +8,7 @@ require "aws-sdk-lambda" require "aws-sdk-sqs" +require "aws-sdk-cloudwatch" require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler" require "support/builds_indexer_autoscaler" @@ -22,13 +23,16 @@ class IndexerAutoscalerLambda let(:max_cpu_target) { 80 } let(:cpu_midpoint) { 75 } let(:maximum_concurrency) { 1000 } + let(:required_free_storage_in_mb) { 10000 } it "1.5x the concurrency when the CPU usage is significantly below the minimum target" do lambda_client = lambda_client_with_concurrency(200) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(10.0), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -39,10 +43,12 @@ class IndexerAutoscalerLambda it "increases concurrency by a factor CPU usage when CPU is slightly below the minimum target" do # CPU is at 50% and our target range is 70-80. 75 / 50 = 1.5, so increase it by 50%. lambda_client = lambda_client_with_concurrency(200) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(50.0), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -53,10 +59,12 @@ class IndexerAutoscalerLambda it "sets concurrency to the max when it cannot be increased anymore when CPU usage is under the limit" do current_concurrency = maximum_concurrency - 1 lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(10), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -67,10 +75,12 @@ class IndexerAutoscalerLambda it "decreases concurrency by a factor of the CPU when the CPU usage is over the limit" do # CPU is at 90% and our target range is 70-80. 90 / 75 = 1.2, so decrease it by 20%. lambda_client = lambda_client_with_concurrency(500) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(90.0), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -81,10 +91,12 @@ class IndexerAutoscalerLambda it "leaves concurrency unchanged when it cannot be decreased anymore when CPU utilization is over the limit" do current_concurrency = 0 lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(100), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -94,11 +106,13 @@ class IndexerAutoscalerLambda it "does not adjust concurrency when the CPU is within the target range" do lambda_client = lambda_client_with_concurrency(500) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) [min_cpu_target, cpu_midpoint, max_cpu_target].each do |cpu_usage| concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(cpu_usage), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -113,10 +127,12 @@ class IndexerAutoscalerLambda expect(high_cpu_usage).to be > max_cpu_target lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(min_cpu_target, high_cpu_usage), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -125,13 +141,30 @@ class IndexerAutoscalerLambda expect(updated_concurrency_requested_from(lambda_client)).to eq [460] # 500 - 8% since 81/75 = 1.08 end + it "pauses the concurrency when free storage space drops below the threshold regardless of cpu" do + lambda_client = lambda_client_with_concurrency(500) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb - 1) + concurrency_scaler = build_concurrency_scaler( + datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1), + sqs_client: sqs_client_with_number_of_messages(1), + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client + ) + + tune_indexer_concurrency(concurrency_scaler) + + expect(updated_concurrency_requested_from(lambda_client)).to eq [2] # 2 is the minimum + end + it "sets concurrency to the min when there are no messages in the queue" do current_concurrency = 500 lambda_client = lambda_client_with_concurrency(current_concurrency) + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(min_cpu_target - 1), sqs_client: sqs_client_with_number_of_messages(0), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -141,12 +174,14 @@ class IndexerAutoscalerLambda it "leaves concurrency unset if it is currently unset" do lambda_client = lambda_client_without_concurrency + cloudwatch_client = cloudwatch_client_with_storage_metrics(required_free_storage_in_mb + 1) # CPU is at 50% and our target range is 70-80. concurrency_scaler = build_concurrency_scaler( datastore_client: datastore_client_with_cpu_usage(50), sqs_client: sqs_client_with_number_of_messages(1), - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ) tune_indexer_concurrency(concurrency_scaler) @@ -204,6 +239,20 @@ def lambda_client_with_concurrency(concurrency) end end + def cloudwatch_client_with_storage_metrics(free_storage) + ::Aws::CloudWatch::Client.new(stub_responses: true).tap do |cloudwatch_client| + cloudwatch_client.stub_responses(:get_metric_data, { + metric_data_results: [ + { + id: "minFreeStorageAcrossNodes", + values: [free_storage.to_f], + timestamps: [::Time.parse("2024-10-30T12:00:00Z")] + } + ] + }) + end + end + # If the lambda is using unreserved concurrency, reserved_concurrent_executions on the Lambda client will be nil. def lambda_client_without_concurrency ::Aws::Lambda::Client.new(stub_responses: true).tap do |lambda_client| @@ -213,11 +262,12 @@ def lambda_client_without_concurrency end end - def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:) + def build_concurrency_scaler(datastore_client:, sqs_client:, lambda_client:, cloudwatch_client:) build_indexer_autoscaler( clients_by_name: {"main" => datastore_client}, sqs_client: sqs_client, - lambda_client: lambda_client + lambda_client: lambda_client, + cloudwatch_client: cloudwatch_client ).concurrency_scaler end @@ -227,7 +277,9 @@ def tune_indexer_concurrency(concurrency_scaler) min_cpu_target: min_cpu_target, max_cpu_target: max_cpu_target, maximum_concurrency: maximum_concurrency, - indexer_function_name: indexer_function_name + required_free_storage_in_mb: required_free_storage_in_mb, + indexer_function_name: indexer_function_name, + cluster_name: "some-eg-cluster" ) end end diff --git a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb index 971e911c..973194f3 100644 --- a/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb +++ b/elasticgraph-indexer_autoscaler_lambda/spec/unit/elastic_graph/indexer_autoscaler_lambda/lambda_function_spec.rb @@ -8,6 +8,7 @@ require "aws-sdk-lambda" require "aws-sdk-sqs" +require "aws-sdk-cloudwatch" require "elastic_graph/spec_support/lambda_function" RSpec.describe "Autoscale indexer lambda function" do @@ -24,9 +25,11 @@ end lambda_client = ::Aws::Lambda::Client.new(stub_responses: true) + cloudwatch_client = ::Aws::CloudWatch::Client.new(stub_responses: true) allow(::Aws::SQS::Client).to receive(:new).and_return(sqs_client) allow(::Aws::Lambda::Client).to receive(:new).and_return(lambda_client) + allow(::Aws::CloudWatch::Client).to receive(:new).and_return(cloudwatch_client) expect_loading_lambda_to_define_constant( lambda: "elastic_graph/indexer_autoscaler_lambda/lambda_function.rb", @@ -37,6 +40,8 @@ "min_cpu_target" => 70, "max_cpu_target" => 80, "maximum_concurrency" => 1000, + "required_free_storage_in_mb" => 100, + "cluster_name" => "some-eg-cluster", "indexer_function_name" => "some-eg-app-indexer" } lambda_function.handle_request(event: event, context: {}) diff --git a/rbs_collection.yaml b/rbs_collection.yaml index 894e4d6f..72d9c100 100644 --- a/rbs_collection.yaml +++ b/rbs_collection.yaml @@ -24,6 +24,8 @@ gems: ignore: true # Use `ignore: false` to tell rbs collection to pull the RBS signatures from these gems. + - name: aws-sdk-cloudwatch + ignore: false - name: aws-sdk-lambda ignore: false - name: aws-sdk-sqs