From 9112abd09163a189df881fbc72662ad9185e2d99 Mon Sep 17 00:00:00 2001 From: Daniel Prelipcean Date: Tue, 10 Sep 2019 17:25:05 +0200 Subject: [PATCH] serial: partial workflow execution * Add target as operational option. * Addresses #54. Signed-off-by: Daniel Prelipcean --- reana_workflow_engine_serial/tasks.py | 6 +++++- reana_workflow_engine_serial/utils.py | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/reana_workflow_engine_serial/tasks.py b/reana_workflow_engine_serial/tasks.py index f19089e..252d99a 100644 --- a/reana_workflow_engine_serial/tasks.py +++ b/reana_workflow_engine_serial/tasks.py @@ -25,7 +25,8 @@ copy_workspace_to_cache, load_json, poll_job_status, publish_cache_copy, publish_job_submission, publish_job_success, publish_workflow_failure, - publish_workflow_start, sanitize_command) + publish_workflow_start, sanitize_command, + validate_workflow) rjc_api_client = JobControllerAPIClient('reana-job-controller') @@ -122,6 +123,9 @@ def run(workflow_json, publisher, cache_enabled): """Run a serial workflow.""" + if operational_options: + workflow_json = validate_workflow(workflow_json, operational_options) + expanded_workflow_json = serial_load(None, workflow_json, workflow_parameters) diff --git a/reana_workflow_engine_serial/utils.py b/reana_workflow_engine_serial/utils.py index 9a07395..578186d 100644 --- a/reana_workflow_engine_serial/utils.py +++ b/reana_workflow_engine_serial/utils.py @@ -228,3 +228,25 @@ def publish_workflow_failure(job_id, build_progress_message( failed=failed_jobs) }) + + +def validate_workflow(workflow_json, operational_options): + """Validate the workflow given the operational options.""" + target_step = operational_options.get('target') + + if target_step: + # Target step can be either int or step name (str) + try: + step_number = int(target_step) + except ValueError: + # Get the step names from the workflow + input_steps = list() + for step in workflow_json['steps']: + input_steps.append(step['name']) + + step_number = input_steps.index(target_step) + + # Remove all subsequent steps + workflow_json['steps'] = workflow_json['steps'][:step_number+1] + + return workflow_json