Skip to content

Commit

Permalink
Do not run jobs in parallel in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
dostuffthatmatters committed Jan 31, 2024
1 parent cea8892 commit ceba67a
Showing 1 changed file with 104 additions and 86 deletions.
190 changes: 104 additions & 86 deletions tests/retrieval/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,106 +154,124 @@ def _run(
src.retrieval.utils.retrieval_status.RetrievalStatusList.reset()
assert config.retrieval is not None

NUMBER_OF_JOBS = len(SENSOR_DATA_CONTEXTS) * 2 * 3
print(f"Running {NUMBER_OF_JOBS} retrieval jobs in parallel")

atm: src.types.AtmosphericProfileModel
alg: src.types.RetrievalAlgorithm

# start proffast 1.0 first because it takes the longest
pending_jobs: list[tuple[src.types.RetrievalAlgorithm,
src.types.AtmosphericProfileModel,
em27_metadata.types.SensorDataContext]] = []

for alg in [ # type: ignore
"proffast-1.0", "proffast-2.2", "proffast-2.3"
]:
for atm in [ # type: ignore
"GGG2014", "GGG2020"
]:
# start proffast 1.0 first because it takes the longest
for alg in ["proffast-1.0", "proffast-2.2", "proffast-2.3"]:
for atm in ["GGG2014", "GGG2020"]:
if alg == "proffast-1.0" and atm == "GGG2020":
continue
for sdc in SENSOR_DATA_CONTEXTS:
# set up container factory
src.retrieval.utils.retrieval_status.RetrievalStatusList.add_items(
[sdc],
alg,
atm,
alg, # type: ignore
atm, # type: ignore
)
pending_jobs.append((alg, atm, sdc))

def create_process(
alg: src.types.RetrievalAlgorithm,
atm: src.types.AtmosphericProfileModel,
sdc: em27_metadata.types.SensorDataContext,
) -> multiprocessing.Process:
# create session and run container
session = src.retrieval.session.create_session.run(
container_factory,
sdc,
retrieval_algorithm=alg,
atmospheric_profile_model=atm,
job_settings=src.types.config.RetrievalJobSettingsConfig(
# test this for all alg/atm combinations
# for one of the sensor data contexts
use_local_pressure_in_pcxs=(
sdc.from_datetime.date() == datetime.date(2017, 6, 9)
),
)
)
return multiprocessing.Process(
target=run_session,
args=(session, config, only_run_mock_retrieval),
name=(
f"{session.ctn.container_id}:{alg}-{atm}-" +
f"{sdc.sensor_id}-{sdc.from_datetime.date()}"
),
)
pending_jobs.append((alg, atm, sdc)) # type: ignore

active_processes: list[multiprocessing.Process] = []
finished_processes: list[multiprocessing.Process] = []

cpu_count = multiprocessing.cpu_count()
print(f"Detected {cpu_count} CPU cores")
process_count = max(1, cpu_count - 1)
print(f"Running {process_count} processes in parallel")

# wait for all processes to finish
while True:
while ((len(active_processes) < process_count) and
(len(pending_jobs) > 0)):
j = pending_jobs.pop(0)
p = create_process(*j)
print(f"Starting process {p.name}")
p.start()
active_processes.append(p)
print(f"Started process {p.name}")

time.sleep(0.5)

newly_finished_processes: list[multiprocessing.Process] = []
for p in active_processes:
if not p.is_alive():
newly_finished_processes.append(p)

for p in newly_finished_processes:
print(f"Joining process {p.name}")
p.join()
active_processes.remove(p)
container_factory.remove_container(p.name.split(":")[0])
finished_processes.append(p)
p.close()
print(f"Finished process {p.name}")

if len(active_processes) == 0 and len(pending_jobs) == 0:
break

time.sleep(2)
print(f"Jobs ({len(pending_jobs)}):")
for i, j in enumerate(pending_jobs):
print(
f"Pending | Active | Finished: {len(pending_jobs)} |" +
f" {len(active_processes)} | {len(finished_processes)}"
f" #{i}: {j[0]} | {j[1]} | {j[2].sensor_id} | {j[2].from_datetime.date()}"
)

ci_env_var = os.getenv("CI", "not set")
print(f'Environment variable "CI" = {ci_env_var}')
run_parallel = (ci_env_var not in ["true", "True", "1", True])

if run_parallel:
active_processes: list[multiprocessing.Process] = []
finished_processes: list[multiprocessing.Process] = []

cpu_count = multiprocessing.cpu_count()
print(f"Detected {cpu_count} CPU cores")
process_count = max(1, cpu_count - 1)
print(f"Running {process_count} processes in parallel")

# wait for all processes to finish
while True:
while ((len(active_processes) < process_count) and
(len(pending_jobs) > 0)):

j = pending_jobs.pop(0)
print(f"Spinning up new session")
session = src.retrieval.session.create_session.run(
container_factory,
j[2],
retrieval_algorithm=j[0],
atmospheric_profile_model=j[1],
job_settings=src.types.config.RetrievalJobSettingsConfig(
# test this for all alg/atm combinations
# for one of the sensor data contexts
use_local_pressure_in_pcxs=(
j[2].from_datetime.date() == datetime.date(
2017, 6, 9
)
),
)
)
print(f"Creating new process")
p = multiprocessing.Process(
target=run_session,
args=(session, config, only_run_mock_retrieval),
name=(
f"{session.ctn.container_id}:{alg}-{atm}-" +
f"{sdc.sensor_id}-{sdc.from_datetime.date()}"
),
)
print(f"Starting process {p.name}")
p.start()
active_processes.append(p)
print(f"Started process {p.name}")

time.sleep(0.5)

newly_finished_processes: list[multiprocessing.Process] = []
for p in active_processes:
if not p.is_alive():
newly_finished_processes.append(p)

for p in newly_finished_processes:
print(f"Joining process {p.name}")
p.join()
active_processes.remove(p)
container_factory.remove_container(p.name.split(":")[0])
finished_processes.append(p)
p.close()
print(f"Finished process {p.name}")

if len(active_processes) == 0 and len(pending_jobs) == 0:
break

time.sleep(2)
print(
f"Pending | Active | Finished: {len(pending_jobs)} |" +
f" {len(active_processes)} | {len(finished_processes)}"
)
else:
print("Running in serial mode")
for i, j in enumerate(pending_jobs):
print(f"#{i}: Spinning up new session")
session = src.retrieval.session.create_session.run(
container_factory,
j[2],
retrieval_algorithm=j[0],
atmospheric_profile_model=j[1],
job_settings=src.types.config.RetrievalJobSettingsConfig(
# test this for all alg/atm combinations
# for one of the sensor data contexts
use_local_pressure_in_pcxs=(
j[2].from_datetime.date() == datetime.date(2017, 6, 9)
),
)
)
print(f"#{i}: Running session")
run_session(session, config, only_run_mock_retrieval)
print(f"#{i}: Finished session")
container_factory.remove_container(session.ctn.container_id)


def _point_config_to_test_data(config: src.types.Config) -> None:
config.general.data.datalogger.root = os.path.join(
Expand Down

0 comments on commit ceba67a

Please sign in to comment.