Skip to content

Commit

Permalink
Reduce resource usage in container tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dostuffthatmatters committed Jan 31, 2024
1 parent 47ccdf3 commit cea8892
Showing 1 changed file with 40 additions and 34 deletions.
74 changes: 40 additions & 34 deletions tests/retrieval/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ def _run(

atm: src.types.AtmosphericProfileModel
alg: src.types.RetrievalAlgorithm
pending_processes: list[multiprocessing.Process] = []

# start proffast 1.0 first because it takes the longest
pending_jobs: list[tuple[src.types.RetrievalAlgorithm,
src.types.AtmosphericProfileModel,
em27_metadata.types.SensorDataContext]] = []

for alg in [ # type: ignore
"proffast-1.0", "proffast-2.2", "proffast-2.3"
]:
Expand All @@ -177,33 +180,35 @@ def _run(
alg,
atm,
)
# create session and run container
session = src.retrieval.session.create_session.run(
container_factory,
sdc,
retrieval_algorithm=alg,
atmospheric_profile_model=atm,
job_settings=src.types.config.RetrievalJobSettingsConfig(
# test this for all alg/atm combinations
# for one of the sensor data contexts
use_local_pressure_in_pcxs=(
sdc.from_datetime.date() == datetime.date(
2017, 6, 9
)
),
)
)
name = (
f"{session.ctn.container_id}:{alg}-{atm}-" +
f"{sdc.sensor_id}-{sdc.from_datetime.date()}"
)
p = multiprocessing.Process(
target=run_session,
args=(session, config, only_run_mock_retrieval),
name=name,
daemon=True,
)
pending_processes.append(p)
pending_jobs.append((alg, atm, sdc))

def create_process(
alg: src.types.RetrievalAlgorithm,
atm: src.types.AtmosphericProfileModel,
sdc: em27_metadata.types.SensorDataContext,
) -> multiprocessing.Process:
# create session and run container
session = src.retrieval.session.create_session.run(
container_factory,
sdc,
retrieval_algorithm=alg,
atmospheric_profile_model=atm,
job_settings=src.types.config.RetrievalJobSettingsConfig(
# test this for all alg/atm combinations
# for one of the sensor data contexts
use_local_pressure_in_pcxs=(
sdc.from_datetime.date() == datetime.date(2017, 6, 9)
),
)
)
return multiprocessing.Process(
target=run_session,
args=(session, config, only_run_mock_retrieval),
name=(
f"{session.ctn.container_id}:{alg}-{atm}-" +
f"{sdc.sensor_id}-{sdc.from_datetime.date()}"
),
)

active_processes: list[multiprocessing.Process] = []
finished_processes: list[multiprocessing.Process] = []
Expand All @@ -216,8 +221,9 @@ def _run(
# wait for all processes to finish
while True:
while ((len(active_processes) < process_count) and
(len(pending_processes) > 0)):
p = pending_processes.pop(0)
(len(pending_jobs) > 0)):
j = pending_jobs.pop(0)
p = create_process(*j)
print(f"Starting process {p.name}")
p.start()
active_processes.append(p)
Expand All @@ -236,15 +242,15 @@ def _run(
active_processes.remove(p)
container_factory.remove_container(p.name.split(":")[0])
finished_processes.append(p)
p.close()
print(f"Finished process {p.name}")

if len(active_processes) == 0 and len(pending_processes) == 0:
if len(active_processes) == 0 and len(pending_jobs) == 0:
break

time.sleep(1)
print("Waiting ...")
time.sleep(2)
print(
f"Pending | Active | Finished: {len(pending_processes)} |" +
f"Pending | Active | Finished: {len(pending_jobs)} |" +
f" {len(active_processes)} | {len(finished_processes)}"
)

Expand Down

0 comments on commit cea8892

Please sign in to comment.