Skip to content

Commit

Permalink
Improve HyperParameterOptimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
allegroai committed Aug 27, 2020
1 parent b25ca9b commit f11c6f5
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 36 deletions.
7 changes: 5 additions & 2 deletions trains/automation/hpbandster/bandster.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def __init__(
total_max_jobs, # type: Optional[int]
pool_period_min=2., # type: float
time_limit_per_job=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
local_port=9090, # type: int
**bohb_kwargs # type: Any
):
Expand Down Expand Up @@ -163,6 +164,8 @@ def __init__(
:param float pool_period_min: time in minutes between two consecutive pools
:param float time_limit_per_job: Optional, maximum execution time per single job in minutes,
when time limit is exceeded job is aborted
:param float compute_time_limit: The maximum compute time in minutes. When time limit is exceeded,
all jobs aborted. (Optional)
:param int local_port: default port 9090 tcp, this is a must for the BOHB workers to communicate, even locally.
:param bohb_kwargs: arguments passed directly to the BOHB object
"""
Expand All @@ -178,8 +181,8 @@ def __init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job,
max_iteration_per_job=max_iteration_per_job, min_iteration_per_job=min_iteration_per_job,
total_max_jobs=total_max_jobs)
compute_time_limit=compute_time_limit, max_iteration_per_job=max_iteration_per_job,
min_iteration_per_job=min_iteration_per_job, total_max_jobs=total_max_jobs)
self._max_iteration_per_job = max_iteration_per_job
self._min_iteration_per_job = min_iteration_per_job
verified_bohb_kwargs = ['eta', 'min_budget', 'max_budget', 'min_points_in_model', 'top_n_percent',
Expand Down
141 changes: 112 additions & 29 deletions trains/automation/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from logging import getLogger
from threading import Thread, Event
from time import time
from typing import Union, Any, Sequence, Optional, Mapping, Callable
from typing import Dict, Set, Tuple, Union, Any, Sequence, Optional, Mapping, Callable

from .job import TrainsJob
from .parameters import Parameter
from ..logger import Logger
from ..backend_api.services import workers as workers_service, tasks as tasks_services
from ..task import Task

logger = getLogger('trains.automation.optimization')
Expand Down Expand Up @@ -212,14 +214,11 @@ def to_dict(self):
# returned dict is Mapping[Union['jobs', 'iterations', 'compute_time'], Mapping[Union['limit', 'used'], float]]
current_budget = {}
jobs = self.jobs.used
if jobs:
current_budget['jobs'] = {'limit': self.jobs.limit, 'used': jobs}
current_budget['jobs'] = {'limit': self.jobs.limit, 'used': jobs if jobs else 0}
iterations = self.iterations.used
if iterations:
current_budget['iterations'] = {'limit': self.iterations.limit, 'used': iterations}
current_budget['iterations'] = {'limit': self.iterations.limit, 'used': iterations if iterations else 0}
compute_time = self.compute_time.used
if compute_time:
current_budget['compute_time'] = {'limit': self.compute_time.limit, 'used': compute_time}
current_budget['compute_time'] = {'limit': self.compute_time.limit, 'used': compute_time if compute_time else 0}
return current_budget


Expand All @@ -239,6 +238,7 @@ def __init__(
num_concurrent_workers, # type: int
pool_period_min=2., # type: float
time_limit_per_job=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
min_iteration_per_job=None, # type: Optional[int]
max_iteration_per_job=None, # type: Optional[int]
total_max_jobs=None, # type: Optional[int]
Expand All @@ -256,6 +256,8 @@ def __init__(
:param float pool_period_min: The time between two consecutive pools (minutes).
:param float time_limit_per_job: The maximum execution time per single job in minutes. When time limit is
exceeded, the job is aborted. (Optional)
:param float compute_time_limit: The maximum compute time in minutes. When time limit is exceeded,
all jobs aborted. (Optional)
:param int min_iteration_per_job: The minimum iterations (of the Objective metric) per single job (Optional)
:param int max_iteration_per_job: The maximum iterations (of the Objective metric) per single job.
When maximum iterations is exceeded, the job is aborted. (Optional)
Expand All @@ -270,6 +272,7 @@ def __init__(
self._num_concurrent_workers = num_concurrent_workers
self.pool_period_minutes = pool_period_min
self.time_limit_per_job = time_limit_per_job
self.compute_time_limit = compute_time_limit
self.max_iteration_per_job = max_iteration_per_job
self.min_iteration_per_job = min_iteration_per_job
self.total_max_jobs = total_max_jobs
Expand All @@ -283,8 +286,7 @@ def __init__(
self._job_project = {}
self.budget = Budget(
jobs_limit=self.total_max_jobs,
compute_time_limit=self.total_max_jobs * self.time_limit_per_job if
self.time_limit_per_job and self.total_max_jobs else None,
compute_time_limit=self.compute_time_limit if self.compute_time_limit else None,
iterations_limit=self.total_max_jobs * self.max_iteration_per_job if
self.max_iteration_per_job and self.total_max_jobs else None
)
Expand Down Expand Up @@ -402,6 +404,15 @@ def monitor_job(self, job):
if elapsed > self.time_limit_per_job:
abort_job = True

if self.compute_time_limit:
if not self.time_limit_per_job:
elapsed = job.elapsed() / 60.
if elapsed > 0:
self.budget.compute_time.update(job.task_id(), elapsed)
self.budget.compute_time.update(job.task_id(), job.elapsed() / 60.)
if self.budget.compute_time.used and self.compute_time_limit < self.budget.compute_time.used:
abort_job = True

if self.max_iteration_per_job:
iterations = self._get_job_iterations(job)
if iterations > 0:
Expand Down Expand Up @@ -640,6 +651,7 @@ def __init__(
num_concurrent_workers, # type: int
pool_period_min=2., # type: float
time_limit_per_job=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
max_iteration_per_job=None, # type: Optional[int]
total_max_jobs=None, # type: Optional[int]
**_ # type: Any
Expand All @@ -656,6 +668,8 @@ def __init__(
:param float pool_period_min: The time between two consecutive pools (minutes).
:param float time_limit_per_job: The maximum execution time per single job in minutes. When the time limit is
exceeded job is aborted. (Optional)
:param float compute_time_limit: The maximum compute time in minutes. When time limit is exceeded,
all jobs aborted. (Optional)
:param int max_iteration_per_job: The maximum iterations (of the Objective metric)
per single job, When exceeded, the job is aborted.
:param int total_max_jobs: The total maximum jobs for the optimization process. The default is ``None``, for
Expand All @@ -665,7 +679,8 @@ def __init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job,
max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs, **_)
compute_time_limit=compute_time_limit, max_iteration_per_job=max_iteration_per_job,
total_max_jobs=total_max_jobs, **_)
self._param_iterator = None

def create_job(self):
Expand Down Expand Up @@ -711,6 +726,7 @@ def __init__(
num_concurrent_workers, # type: int
pool_period_min=2., # type: float
time_limit_per_job=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
max_iteration_per_job=None, # type: Optional[int]
total_max_jobs=None, # type: Optional[int]
**_ # type: Any
Expand All @@ -727,6 +743,8 @@ def __init__(
:param float pool_period_min: The time between two consecutive pools (minutes).
:param float time_limit_per_job: The maximum execution time per single job in minutes,
when time limit is exceeded job is aborted. (Optional)
:param float compute_time_limit: The maximum compute time in minutes. When time limit is exceeded,
all jobs aborted. (Optional)
:param int max_iteration_per_job: The maximum iterations (of the Objective metric)
per single job. When exceeded, the job is aborted.
:param int total_max_jobs: The total maximum jobs for the optimization process. The default is ``None``, for
Expand All @@ -736,7 +754,8 @@ def __init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, time_limit_per_job=time_limit_per_job,
max_iteration_per_job=max_iteration_per_job, total_max_jobs=total_max_jobs, **_)
compute_time_limit=compute_time_limit, max_iteration_per_job=max_iteration_per_job,
total_max_jobs=total_max_jobs, **_)
self._hyper_parameters_collection = set()

def create_job(self):
Expand Down Expand Up @@ -787,6 +806,7 @@ def __init__(
max_number_of_concurrent_tasks=10, # type: int
execution_queue='default', # type: str
optimization_time_limit=None, # type: Optional[float]
compute_time_limit=None, # type: Optional[float]
auto_connect_task=True, # type: bool
always_create_task=False, # type: bool
**optimizer_kwargs # type: Any
Expand Down Expand Up @@ -815,6 +835,8 @@ def __init__(
:param str execution_queue: The execution queue to use for launching Tasks (experiments).
:param float optimization_time_limit: The maximum time (minutes) for the entire optimization process. The
default is ``None``, indicating no time limit.
:param float compute_time_limit: The maximum compute time in minutes. When time limit is exceeded,
all jobs aborted. (Optional)
:param bool auto_connect_task: Store optimization arguments and configuration in the Task?
The values are:
Expand Down Expand Up @@ -893,6 +915,7 @@ def __init__(
max_number_of_concurrent_tasks=max_number_of_concurrent_tasks,
execution_queue=execution_queue,
optimization_time_limit=optimization_time_limit,
compute_time_limit=compute_time_limit,
optimizer_kwargs=optimizer_kwargs)
# make sure all the created tasks are our children, as we are creating them
if self._task:
Expand All @@ -916,7 +939,8 @@ def __init__(
self.optimizer = optimizer_class(
base_task_id=opts['base_task_id'], hyper_parameters=hyper_parameters,
objective_metric=self.objective_metric, execution_queue=opts['execution_queue'],
num_concurrent_workers=opts['max_number_of_concurrent_tasks'], **opts.get('optimizer_kwargs', {}))
num_concurrent_workers=opts['max_number_of_concurrent_tasks'],
compute_time_limit=opts['compute_time_limit'], **opts.get('optimizer_kwargs', {}))
self.optimizer.set_optimizer_task(self._task)
self.optimization_timeout = None
self.optimization_start_time = None
Expand Down Expand Up @@ -1227,10 +1251,8 @@ def _daemon(self):

def _report_daemon(self):
# type: () -> ()
worker_to_series = {}
title, series = self.objective_metric.get_objective_metric()
title = '{}/{}'.format(title, series)
series = 'machine:'
counter = 0
completed_jobs = dict()
best_experiment = float('-inf'), None
Expand All @@ -1256,20 +1278,6 @@ def _report_daemon(self):

# do some reporting

# running objective, per machine
running_job_ids = set()
for j in self.optimizer.get_running_jobs():
worker = j.worker()
running_job_ids.add(j.task_id())
if worker not in worker_to_series:
worker_to_series[worker] = len(worker_to_series) + 1
machine_id = worker_to_series[worker]
value = self.objective_metric.get_objective(j)
if value is not None:
task_logger.report_scalar(
title=title, series='{}{}'.format(series, machine_id),
iteration=counter, value=value)

# noinspection PyBroadException
try:
budget = self.optimizer.budget.to_dict()
Expand All @@ -1289,8 +1297,10 @@ def _report_daemon(self):
(self.optimization_timeout - self.optimization_start_time)), ndigits=1)
)

self._report_resources(task_logger, counter)
# collect a summary of all the jobs and their final objective values
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - running_job_ids
cur_completed_jobs = set(self.optimizer.get_created_jobs_ids().keys()) - \
{j.task_id() for j in self.optimizer.get_running_jobs()}
if cur_completed_jobs != set(completed_jobs.keys()):
pairs = []
labels = []
Expand All @@ -1315,6 +1325,8 @@ def _report_daemon(self):
c = completed_jobs[job_id]
self._experiment_completed_cb(job_id, c[0], c[1], c[2], best_experiment[1])

self._report_completed_tasks_best_results(completed_jobs, task_logger, title, counter)

if pairs:
print('Updating job performance summary plot/table')

Expand Down Expand Up @@ -1345,3 +1357,74 @@ def _report_daemon(self):
# we should leave
self.stop()
return

def _report_completed_tasks_best_results(self, completed_jobs, task_logger, title, counter):
# type: (Dict[str, Tuple[float, int, Dict[str, int]]], Logger, str, int) -> ()
if completed_jobs:
value_func, series_name = (max, "max") if self.objective_metric.get_objective_sign() > 0 else \
(min, "min")
task_logger.report_scalar(
title=title,
series=series_name,
iteration=counter,
value=value_func([val[0] for val in completed_jobs.values()]))
latest_completed = self._get_latest_completed_task_value(set(completed_jobs.keys()))
if latest_completed:
task_logger.report_scalar(
title=title,
series="last reported",
iteration=counter,
value=latest_completed)

def _report_resources(self, task_logger, iteration):
# type: (Logger, int) -> ()
self._report_active_workers(task_logger, iteration)
self._report_tasks_status(task_logger, iteration)

def _report_active_workers(self, task_logger, iteration):
# type: (Logger, int) -> ()
cur_task = self._task or Task.current_task()
res = cur_task.send(workers_service.GetAllRequest())
response = res.wait()
if response.ok():
all_workers = response
queue_workers = len(
[
worker.get("id")
for worker in all_workers.response_data.get("workers")
for q in worker.get("queues")
if q.get("name") == self.execution_queue
]
)
task_logger.report_scalar(title="resources", series="queue workers", iteration=iteration, value=queue_workers)

def _report_tasks_status(self, task_logger, iteration):
# type: (Logger, int) -> ()
tasks_status = {"running tasks": 0, "pending tasks": 0}
for job in self.optimizer.get_running_jobs():
if job.is_running():
tasks_status["running tasks"] += 1
else:
tasks_status["pending tasks"] += 1
for series, val in tasks_status.items():
task_logger.report_scalar(
title="resources", series=series,
iteration=iteration, value=val)

def _get_latest_completed_task_value(self, cur_completed_jobs):
# type: (Set[str]) -> float
completed_value = None
latest_completed = None
cur_task = self._task or Task.current_task()
for j in cur_completed_jobs:
res = cur_task.send(tasks_services.GetByIdRequest(task=j))
response = res.wait()
if not response.ok() or response.response_data["task"].get("status") != Task.TaskStatusEnum.completed:
continue
completed_time = datetime.strptime(response.response_data["task"]["completed"].partition("+")[0],
"%Y-%m-%dT%H:%M:%S.%f")
completed_time = completed_time.timestamp()
if not latest_completed or completed_time > latest_completed:
latest_completed = completed_time
completed_value = self.objective_metric.get_objective(j)
return completed_value
Loading

0 comments on commit f11c6f5

Please sign in to comment.