Skip to content

Commit

Permalink
F OpenNebula/one-aiops#70: fix scaling functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
MarioRobres committed Jul 2, 2024
1 parent a166b6f commit 2ebae22
Showing 1 changed file with 24 additions and 46 deletions.
70 changes: 24 additions & 46 deletions lithops/serverless/backends/one/one.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ def __init__(self, one_config, internal_storage):


def invoke(self, docker_image_name, runtime_memory, job_payload):
self._get_nodes()
super()._get_nodes()
scale_nodes, pods, chunksize, worker_processes = self._granularity(
job_payload['total_calls']
)

# Scale nodes
# TODO: Add logic to see if it's worth waiting for the cooldown or not
if scale_nodes > len(self.nodes):
self._scale_oneke(self.nodes, scale_nodes)

Expand Down Expand Up @@ -229,50 +228,59 @@ def _check_vms_status(self):
def _granularity(self, total_functions):
# Set by the user, otherwise calculated based on OpenNebula available Resources
MAX_NODES = 3
max_nodes= MAX_NODES if self.maximum_nodes is -1 else self.maximum_nodes
max_nodes = MAX_NODES if self.maximum_nodes == -1 else self.maximum_nodes
# TODO: get info from VM template
cpus_per_new_node=2
cpus_per_new_node = 2
# TODO: monitor Scaling to set this value
first_node_creation_time=90
additional_node_creation_time=20
first_node_creation_time = 90
additional_node_creation_time = 20

current_nodes = len(self.nodes)
total_cpus_available = sum(node['total_cpu'] - node['used_cpu'] for node in self.nodes)
total_cpus_available = int(sum(float(node['cpu']) for node in self.nodes))
current_pods = total_cpus_available

if total_cpus_available > 0:
estimated_time_no_scaling = (total_functions / total_cpus_available) * self.average_job_execution
else:
estimated_time_no_scaling = float('inf')
estimated_time_no_scaling = float('inf')

best_time = estimated_time_no_scaling
best_nodes_needed = 0
estimated_execution_time = float('inf')

for additional_nodes in range(1, max_nodes - current_nodes + 1):
new_total_cpus_available = total_cpus_available + (additional_nodes * cpus_per_new_node)
estimated_time_with_scaling = (total_functions / new_total_cpus_available) * self.average_job_execution

if current_nodes == 0:
total_creation_time = first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time
if current_nodes == 0 and additional_nodes == 1:
total_creation_time = first_node_creation_time
elif current_nodes > 0 and additional_nodes == 1:
total_creation_time = additional_node_creation_time
else:
total_creation_time = additional_node_creation_time * additional_nodes
total_creation_time = first_node_creation_time + (additional_nodes - 1) * additional_node_creation_time if current_nodes == 0 else additional_node_creation_time * additional_nodes

total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time

if total_estimated_time_with_scaling < best_time:
if total_estimated_time_with_scaling < best_time and new_total_cpus_available <= total_functions:
best_time = total_estimated_time_with_scaling
best_nodes_needed = additional_nodes
current_pods = total_cpus_available + new_total_cpus_available
current_pods = new_total_cpus_available
estimated_execution_time = estimated_time_with_scaling


nodes = current_nodes + best_nodes_needed
pods = current_pods
pods = min(total_functions, current_pods)

logger.info(
f"Nodes: {nodes}, Pods: {pods}, Chunksize: 1, Worker Processes: 1"
)
logger.info(
f"Estimated Execution Time (without creation): {estimated_execution_time:.2f} seconds"
)
return nodes, pods, 1, 1



def _scale_oneke(self, nodes, scale_nodes):
logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes")
# Ensure the service can be scaled
Expand All @@ -284,34 +292,4 @@ def _scale_oneke(self, nodes, scale_nodes):
logger.info("OneKE service is in 'COOLDOWN' state and does not need to be scaled")
return
self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes))
self._wait_for_oneke('COOLDOWN')


def _get_nodes(self):
self.nodes = []
list_all_nodes = self.core_api.list_node()
for node in list_all_nodes.items:
if node.spec.taints:
continue

total_cpu = node.status.allocatable['cpu']
used_cpu = node.status.capacity['cpu']
total_memory = node.status.allocatable['memory']
used_memory = node.status.capacity['memory']

if 'm' in total_cpu:
total_cpu = int(re.search(r'\d+', total_cpu).group()) / 1000
if 'm' in used_cpu:
used_cpu = int(re.search(r'\d+', used_cpu).group()) / 1000
if 'Ki' in total_memory:
total_memory = int(re.search(r'\d+', total_memory).group()) / 1024
if 'Ki' in used_memory:
used_memory = int(re.search(r'\d+', used_memory).group()) / 1024

self.nodes.append({
"name": node.metadata.name,
"total_cpu": total_cpu,
"total_memory": total_memory,
"used_cpu": used_cpu,
"used_memory": used_memory
})
self._wait_for_oneke('COOLDOWN')

0 comments on commit 2ebae22

Please sign in to comment.