Skip to content

Commit

Permalink
#117 Make output-writing atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
dostuffthatmatters committed Dec 12, 2024
1 parent fb1ba36 commit 1216b35
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions src/retrieval/session/move_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,17 @@ def run(
# CREATE EMPTY OUTPUT DIRECTORY

output_dst = output_dst_successful if day_was_successful else output_dst_failed
output_dst_tmp = output_dst + ".tmp"
if os.path.isdir(output_dst_tmp):
logger.debug("Removing old temporary output")
shutil.rmtree(output_dst_tmp)

# MOVE NEW OUTPUTS

os.makedirs(os.path.dirname(output_dst), exist_ok=True)
os.makedirs(os.path.dirname(output_dst_tmp), exist_ok=True)

if os.path.isdir(output_src_dir):
shutil.copytree(output_src_dir, output_dst)
shutil.copytree(output_src_dir, output_dst_tmp)

# STORE PT OUTPUT DIRECTORY

Expand All @@ -107,24 +111,24 @@ def run(
session.ctx.from_datetime.strftime("%y%m%d"),
)

os.makedirs(os.path.join(output_dst, "analysis"), exist_ok=True)
os.makedirs(os.path.join(output_dst_tmp, "analysis"), exist_ok=True)
if session.retrieval_algorithm == "proffast-1.0":
shutil.copytree(
os.path.join(analysis_dir, "pT"),
os.path.join(output_dst, "analysis", "pT"),
os.path.join(output_dst_tmp, "analysis", "pT"),
)
else:
shutil.copytree(
os.path.join(session.ctn.container_path, "prf", "wrk_fast"),
os.path.join(output_dst, "analysis", "pT"),
os.path.join(output_dst_tmp, "analysis", "pT"),
)

# (OPTIONAL) STORE BINARY SPECTRA

if session.job_settings.store_binary_spectra:
shutil.copytree(
os.path.join(analysis_dir, "cal"),
os.path.join(output_dst, "analysis", "cal"),
os.path.join(output_dst_tmp, "analysis", "cal"),
)
else:
if session.retrieval_algorithm in [
Expand All @@ -136,25 +140,25 @@ def run(
os.makedirs(os.path.join(output_dst, "analysis", "cal"))
shutil.copyfile(
os.path.join(analysis_dir, "cal", "logfile.dat"),
os.path.join(output_dst, "analysis", "cal", "logfile.dat"),
os.path.join(output_dst_tmp, "analysis", "cal", "logfile.dat"),
)

# STORE AUTOMATION LOGS

os.makedirs(os.path.join(output_dst, "logfiles"), exist_ok=True)
os.makedirs(os.path.join(output_dst_tmp, "logfiles"), exist_ok=True)
shutil.copyfile(
logger.logfile_path,
os.path.join(output_dst, "logfiles", "container.log"),
os.path.join(output_dst_tmp, "logfiles", "container.log"),
)
if isinstance(session.ctn, types.Proffast22Container):
shutil.copyfile(
session.ctn.pylot_log_format_path,
os.path.join(output_dst, "pylot_log_format.yml"),
os.path.join(output_dst_tmp, "pylot_log_format.yml"),
)

# STORE AUTOMATION INFO

with open(os.path.join(output_dst, "about.json"), "w") as f:
with open(os.path.join(output_dst_tmp, "about.json"), "w") as f:
now = datetime.datetime.now(datetime.timezone.utc)
dumped_config = config.model_copy(deep=True)
if dumped_config.general.metadata is not None:
Expand All @@ -173,3 +177,9 @@ def run(
"session": session.model_dump(mode="json"),
}
json.dump(about_dict, f, indent=4)

# RENAME TEMPORARY OUTPUT DIRECTORY

# this operation is atomic, i.e., output directories
# with a correct name are always complete
os.rename(output_dst_tmp, output_dst)

0 comments on commit 1216b35

Please sign in to comment.