Skip to content

Commit

Permalink
[ENH] Skip visits for which processed images exist (#1399)
Browse files Browse the repository at this point in the history
* Add a small dataclass for visits

* Implement small extract_visits function

* Modify Pipeline base class to always look for already processed visits

* Update AnatLinear pipeline to use new way

* Implement for PETLinear

* Add more control on file entities for the CAPS generator

* get_processed_images should query cropped or uncropped files depending on what the user is asking...

* Add unit tests for AnatLinear get_processed_images

* update docstring of build_caps_directory

* add more tests

* Enable the caps generator to generate fake pet-linear outputs

* add unit tests for pet-linear

* rename method and remove abstract decorator for now

* add method to query pet-linear transformation matrices

* Implement hashing and ordering for Visit class

* PetLinear considers visits processed if both image and transformation matrix are found

* T1Linear considers visits processed if both image and transformation matrix are found

* Remove unused import

* Allows PETLinear to check for PET images registered on T1w image

* Add test for handling pet images registered on T1w image

* Remove entity pet in the middle of the filename as this was done in the pipeline some time ago
  • Loading branch information
NicolasGensollen authored Dec 4, 2024
1 parent 592ae02 commit 7457f03
Show file tree
Hide file tree
Showing 24 changed files with 772 additions and 113 deletions.
49 changes: 35 additions & 14 deletions clinica/pipelines/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from nipype.interfaces.utility import IdentityInterface
from nipype.pipeline.engine import Node, Workflow

from clinica.utils.bids import Visit
from clinica.utils.check_dependency import SoftwareDependency, ThirdPartySoftware
from clinica.utils.group import GroupID, GroupLabel
from clinica.utils.stream import log_and_warn
Expand Down Expand Up @@ -593,6 +594,18 @@ def sessions(self, value: List[str]):
self._sessions = value
self.is_built = False

@property
def visits(self) -> list[Visit]:
return [
Visit(subject, session)
for subject, session in zip(self.subjects, self.sessions)
]

@visits.setter
def visits(self, value: list[Visit]):
self.subjects = [v.subject for v in value]
self.sessions = [v.session for v in value]

@property
def tsv_file(self) -> Optional[Path]:
return self._tsv_file
Expand All @@ -601,24 +614,31 @@ def tsv_file(self) -> Optional[Path]:
def info_file(self) -> Path:
return self._info_file

@staticmethod
def get_processed_images(
caps_directory: Path, subjects: List[str], sessions: List[str]
) -> List[str]:
"""Extract processed image IDs in `caps_directory` based on `subjects`_`sessions`.
def determine_subject_and_session_to_process(self):
"""Query expected output files in the CAPS folder in order to process only those missing.
Todo:
[ ] Implement this static method in all pipelines
[ ] Make it abstract to force overload in future pipelines
If expected output files already exist in the CAPS folder for some subjects and sessions,
then do not process those again.
"""
from clinica.utils.exceptions import ClinicaException
from clinica.utils.stream import cprint
from clinica.utils.stream import log_and_warn

cprint(msg="Pipeline finished with errors.", lvl="error")
cprint(msg="CAPS outputs were not found for some image(s):", lvl="error")
raise ClinicaException(
"Implementation on which image(s) failed will appear soon."
visits_already_processed = self.get_processed_visits()
if len(visits_already_processed) == 0:
return
message = (
f"In the provided CAPS folder {self.caps_directory}, Clinica found already processed "
f"images for {len(visits_already_processed)} visit(s):\n- "
)
message += "\n- ".join([str(visit) for visit in visits_already_processed])
message += "\nThose visits will be ignored by Clinica."
log_and_warn(message, UserWarning)
self.visits = [
visit for visit in self.visits if visit not in visits_already_processed
]

def get_processed_visits(self) -> list[Visit]:
"""Examine the files present in the CAPS output folder and return the visits for which processing has already been done."""
return []

def _init_nodes(self) -> None:
"""Init the basic workflow and I/O nodes necessary before build."""
Expand Down Expand Up @@ -691,6 +711,7 @@ def build(self):
self._check_dependencies()
self._check_pipeline_parameters()
if not self.has_input_connections():
self.determine_subject_and_session_to_process()
self._build_input_node()
self._build_core_nodes()
if not self.has_output_connections():
Expand Down
49 changes: 49 additions & 0 deletions clinica/pipelines/pet/linear/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Use hash instead of parameters for iterables folder names
# Otherwise path will be too long and generate OSError
from pathlib import Path
from typing import List

from nipype import config

from clinica.pipelines.pet.engine import PETPipeline
from clinica.utils.bids import Visit

cfg = dict(execution={"parameterize_dirs": False})
config.update_config(cfg)
Expand All @@ -30,6 +32,53 @@ def _check_custom_dependencies(self) -> None:
"""Check dependencies that can not be listed in the `info.json` file."""
pass

def get_processed_visits(self) -> list[Visit]:
"""Return a list of visits for which the pipeline is assumed to have run already.
Before running the pipeline, for a given visit, if both the PET SUVR registered image
and the rigid transformation files already exist, then the visit is added to this list.
The pipeline will further skip these visits and run processing only for the remaining
visits.
"""
from functools import reduce

from clinica.utils.filemanip import extract_visits
from clinica.utils.input_files import (
pet_linear_nii,
pet_linear_transformation_matrix,
)
from clinica.utils.inputs import clinica_file_reader

if not self.caps_directory.is_dir():
return []
pet_registered_image, _ = clinica_file_reader(
self.subjects,
self.sessions,
self.caps_directory,
pet_linear_nii(
acq_label=self.parameters["acq_label"],
suvr_reference_region=self.parameters["suvr_reference_region"],
uncropped_image=self.parameters.get("uncropped_image", False),
),
)
visits = [set(extract_visits(pet_registered_image))]
transformation, _ = clinica_file_reader(
self.subjects,
self.sessions,
self.caps_directory,
pet_linear_transformation_matrix(tracer=self.parameters["acq_label"]),
)
visits.append(set(extract_visits(transformation)))
if self.parameters.get("save_PETinT1w", False):
pet_image_in_t1w_space, _ = clinica_file_reader(
self.subjects,
self.sessions,
self.caps_directory,
pet_linear_nii(acq_label=self.parameters["acq_label"], space="T1w"),
)
visits.append(set(extract_visits(pet_image_in_t1w_space)))
return sorted(list(reduce(lambda x, y: x.intersection(y), visits)))

def get_input_fields(self) -> List[str]:
"""Specify the list of possible inputs of this pipeline.
Expand Down
76 changes: 38 additions & 38 deletions clinica/pipelines/t1_linear/anat_linear_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from nipype import config

from clinica.pipelines.engine import Pipeline
from clinica.utils.bids import Visit
from clinica.utils.check_dependency import ThirdPartySoftware
from clinica.utils.stream import log_and_warn

Expand Down Expand Up @@ -67,24 +68,46 @@ def __init__(
caps_name=caps_name,
)

@staticmethod
def get_processed_images(
caps_directory: Path, subjects: List[str], sessions: List[str]
) -> List[str]:
from clinica.utils.filemanip import extract_image_ids
from clinica.utils.input_files import T1W_LINEAR_CROPPED
def get_processed_visits(self) -> list[Visit]:
"""Return a list of visits for which the pipeline is assumed to have run already.
Before running the pipeline, for a given visit, if both the T1w image registered
to the MNI152NLin2009cSym template and the affine transformation estimated with ANTs
already exist, then the visit is added to this list.
The pipeline will further skip these visits and run processing only for the remaining
visits.
"""
from clinica.utils.filemanip import extract_visits
from clinica.utils.input_files import (
T1W_LINEAR,
T1W_LINEAR_CROPPED,
T1W_TO_MNI_TRANSFORM,
)
from clinica.utils.inputs import clinica_file_reader

image_ids: List[str] = []
if caps_directory.is_dir():
cropped_files, _ = clinica_file_reader(
subjects,
sessions,
caps_directory,
T1W_LINEAR_CROPPED,
if not self.caps_directory.is_dir():
return []
images, _ = clinica_file_reader(
self.subjects,
self.sessions,
self.caps_directory,
T1W_LINEAR
if self.parameters.get("uncropped_image", False)
else T1W_LINEAR_CROPPED,
)
visits_having_image = extract_visits(images)
transformation, _ = clinica_file_reader(
self.subjects,
self.sessions,
self.caps_directory,
T1W_TO_MNI_TRANSFORM,
)
visits_having_transformation = extract_visits(transformation)
return sorted(
list(
set(visits_having_image).intersection(set(visits_having_transformation))
)
image_ids = extract_image_ids(cropped_files)
return image_ids
)

def _check_custom_dependencies(self) -> None:
"""Check dependencies that can not be listed in the `info.json` file."""
Expand Down Expand Up @@ -119,8 +142,6 @@ def _build_input_node(self):
import nipype.interfaces.utility as nutil
import nipype.pipeline.engine as npe

from clinica.utils.exceptions import ClinicaBIDSError, ClinicaException
from clinica.utils.filemanip import extract_subjects_sessions_from_filename
from clinica.utils.image import get_mni_template
from clinica.utils.input_files import T1W_NII, Flair_T2W_NII
from clinica.utils.inputs import clinica_file_filter
Expand All @@ -131,27 +152,6 @@ def _build_input_node(self):
"t1" if self.name == "t1-linear" else "flair"
)

# Display image(s) already present in CAPS folder
# ===============================================
processed_ids = self.get_processed_images(
self.caps_directory, self.subjects, self.sessions
)
if len(processed_ids) > 0:
cprint(
msg=f"Clinica found {len(processed_ids)} image(s) already processed in CAPS directory:",
lvl="warning",
)
for image_id in processed_ids:
cprint(msg=f"{image_id.replace('_', ' | ')}", lvl="warning")
cprint(msg=f"Image(s) will be ignored by Clinica.", lvl="warning")
input_ids = [
f"{p_id}_{s_id}" for p_id, s_id in zip(self.subjects, self.sessions)
]
to_process_ids = list(set(input_ids) - set(processed_ids))
self.subjects, self.sessions = extract_subjects_sessions_from_filename(
to_process_ids
)

# Inputs from anat/ folder
# ========================
# anat image file:
Expand Down
20 changes: 20 additions & 0 deletions clinica/utils/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,31 @@
"BIDS_VERSION",
"Extension",
"Suffix",
"Visit",
]

BIDS_VERSION = Version("1.7.0")


@dataclass(frozen=True)
class Visit:
subject: str
session: str

def __lt__(self, obj):
return (self.subject < obj.subject) or (
self.subject == obj.subject and self.session < obj.session
)

def __gt__(self, obj):
return (self.subject > obj.subject) or (
self.subject == obj.subject and self.session > obj.session
)

def __str__(self) -> str:
return f"{self.subject} {self.session}"


class Extension(str, Enum):
"""Possible extensions in BIDS file names."""

Expand Down
10 changes: 10 additions & 0 deletions clinica/utils/filemanip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
from pathlib import Path
from typing import Callable, List, Optional, Union

from .bids import Visit

__all__ = [
"UserProvidedPath",
"delete_directories",
"delete_directories_task",
"extract_crash_files_from_log_file",
"extract_image_ids",
"extract_visits",
"extract_metadata_from_json",
"extract_subjects_sessions_from_filename",
"get_filename_no_ext",
Expand Down Expand Up @@ -365,6 +368,13 @@ def extract_image_ids(bids_or_caps_files: list[str]) -> list[str]:
return id_bids_or_caps_files


def extract_visits(bids_or_caps_files: list[str]) -> list[Visit]:
return [
Visit(*image_id.split("_"))
for image_id in extract_image_ids(bids_or_caps_files)
]


def extract_subjects_sessions_from_filename(
bids_or_caps_files: list[str],
) -> tuple[list[str], list[str]]:
Expand Down
Loading

0 comments on commit 7457f03

Please sign in to comment.