-
Notifications
You must be signed in to change notification settings - Fork 5
Pipelines
This section describes the pipeline for extracting event logs. Furthermore, it explains the functionality of the individual modules within the pipeline. These modules are located in the tracex_project/extraction/logic/modules
directory and are crucial for processing and analyzing the data.
The pipeline for extracting event logs consists of several steps, from identifying the data sources to saving the extracted data. This section provides a detailed and understandable description of the individual modules used to extract and process event logs. It includes an explanation of the functionality of each module and its use within the pipeline.
The Patient Journey Generation Pipeline is a framework for generating synthetic COVID-19 patient journeys.
The pipeline randomizes key attributes to build a unique persona for each generated patient journey:
- Sex: Male or female
- Nationality: Randomly selected from a predefined list of countries
- Key dates: Chosen to establish a timeline for the persona's journey
These randomized elements are combined to produce a short biography of the persona, providing context for the patient journey.
Using the crafted persona and biography as a foundation, the system then generates a synthetic patient journey specific to the COVID-19 pandemic. The journey incorporates the persona's attributes and follows a realistic disease progression and interaction with the healthcare system based on established models and data on COVID-19 cases. The generated patient journeys can be used for modeling, analysis, training, or other purposes where synthetic COVID-19 case data is required, while maintaining patient privacy.
Hier Preprocessing Text:
This pipeline is ultimately used to perform the extraction of the patient journey.
The main steps of the pipeline include:
- Extracting activity labels
- Adding start dates, end dates, and durations
- Adding event types
- Adding locations
Additionally, we can extract cohorts and measure metrics.
File path: tracex_project/extraction/logic/modules/module_cohort_tagger.py
Class: CohortTagger
This is the module that extracts the cohort information from the patient journey. The cohort tags are condition, age, biological sex, origin and preexisting condition.
Detailed functions:
def __extract_cohort_tags(patient_journey) -> Dict[str, str]:
"""Extracts information about condition, sex, age, origin and preexisting condition."""
cohort_data = {}
for message_list in Prompt.objects.get(name="COHORT_TAG_MESSAGES").text:
messages = message_list[1:]
messages.append(
{"role": "user", "content": patient_journey},
)
tag = u.query_gpt(messages)
cohort_data[message_list[0]] = tag
return cohort_data
Example: Giving a Patient Journey.
Our output could look like this:
{'condition': 'Rheumatoid arthritis', 'sex': 'female', 'age': '30', 'origin': 'United Kingdom', 'preexisting_condition': 'Heart disease'}
If we find nothing in a patient journey for a cohort, we would populate this cohort with N/A
. A function named __remove_placeholder
was written for this purpose. Assuming the above patient journey, we would not have found any pre-existing conditions in the text.
Then our output would look like this:
{'condition': 'Rheumatoid arthritis', 'sex': 'female', 'age': '30', 'origin': 'United Kingdom', 'preexisting_condition': 'N/A'}
If we could not find a cohort in the patient journey, we would return none
in this case.
File Path: tracex_project/extraction/logic/modules/module_activity_labeler.py
Class: ActivityLabeler
This module extracts activity labels from the data to identify specific activities within the event logs. It returns the resulting DataFrame, which contains the extracted activity labels.
File Path: tracex_project/extraction/logic/modules/module_time_extractor.py
Class: TimeExtractor
This module extracts time information from the patient journey, including start dates, end dates, and durations. It returns the resulting DataFrame, which contains the extracted start, end, and duration times in the correct format.
Detailed Functions:
def __extract_start_date(self, row: pd.Series) -> str:
"""Extract the start date for a given activity."""
lower, upper = u.get_snippet_bounds(
index=(int(row["sentence_id"])), length=len(self.patient_journey_sentences)
)
patient_journey_snippet = ". ".join(self.patient_journey_sentences[lower:upper])
messages = Prompt.objects.get(name="START_DATE_MESSAGES").text
messages.append(
{
"role": "user",
"content": "Text: "
+ patient_journey_snippet
+ "\nActivity label: "
+ row["activity"],
}
)
start = u.query_gpt(messages)
return start
The method determines the bounds of a text snippet surrounding the sentence of interest by using the sentence_id
from the row and the total number of sentences in the patient's journey. With these bounds, it creates a snippet of text that includes relevant context around the activity.
We then use the snippet to query a GPT model, which processes the context and extracts the start date for the activity. Finally, the method returns the start date obtained from the GPT model.
For example, for the given patient journey above:
For the activity noticing fatigue and joint pains
, we would return 20230201T0000
.
The end date is also extracted using the same principle.
The calculate_duration method is a static method that calculates the duration of an activity based on its start and end timestamps. It operates on a row from a DataFrame, which contains the timestamps for each activity.
@staticmethod
def __calculate_duration(row: pd.Series) -> str:
"""Calculate the duration of an activity."""
duration = row["time:end_timestamp"] - row["time:timestamp"]
hours, remainder = divmod(duration.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
The method computes the duration by subtracting the start timestamp (time:timestamp) from the end timestamp (time:end_timestamp). This gives a timedelta object representing the duration of the activity.
Next, the method converts this duration into total seconds and uses the divmod function to break it down into hours, minutes, and seconds. The divmod function first divides the total seconds by 3600 (the number of seconds in an hour) to get the hours and the remainder of seconds. Then, it divides the remainder by 60 to get the minutes and seconds.
Finally, the method returns the duration formatted as a string in the "HH:MM " format, ensuring that each unit is always represented by two digits for consistency and readability.
The __post_processing
method is designed to clean and fill missing values in the timestamps within a DataFrame. This method ensures that all date columns are properly formatted and any gaps in the data are handled appropriately.
The method defines several helper functions to carry out specific tasks:
Convert to Datetime:
def convert_to_datetime(df: pd.DataFrame, column: pd.Series) -> pd.DataFrame:
df[column] = pd.to_datetime(df[column], format="%Y%m%dT%H%M", errors="coerce")
return df
This function converts the specified column in the DataFrame to datetime format. It uses a specific format and coerces any errors that occur during the conversion.
Set Default Date if NA:
def set_default_date_if_na(df: pd.DataFrame, column: pd.Series) -> pd.DataFrame:
if df[column].isna().all():
df[column] = df[column].fillna(pd.Timestamp("2020-01-01 00:00"))
return df
This function sets a default date of "2020-01-01 00:00" for the entire column if all values in the column are missing (NA).
Fill Missing Values:
def fill_missing_values(df: pd.DataFrame, column: pd.Series) -> pd.DataFrame:
df[column] = df[column].ffill().bfill()
return df
This function fills missing values in the column by carrying forward the next valid observation (forward fill) and then carrying backward the last valid observation (backward fill).
Fix End Dates:
def fix_end_dates(row: pd.Series) -> pd.Series:
if row["time:end_timestamp"] is pd.NaT and row["time:timestamp"] is not pd.NaT:
row["time:end_timestamp"] = row["time:timestamp"]
return row
This function ensures that if the end timestamp is missing but the start timestamp is present, the end timestamp is set to be the same as the start timestamp.
In summary, the __post_processing
method systematically ensures that all timestamps in the DataFrame are correctly formatted, default values are set where necessary, and any remaining missing values are filled, resulting in a complete and consistent dataset.
File Path: tracex_project/extraction/logic/modules/module_event_type_classifier.py
Class: EventTypeClassifier
This module classifies the event types of the activities. The given event types are 'Symptom Onset', 'Symptom Offset', 'Diagnosis', 'Doctor visit', 'Treatment', 'Hospital admission', 'Hospital discharge', 'Medication', 'Lifestyle Change' and 'Feelings'. This is done so that we can extract a standardized set of event types from the patient journey. This is necessary for the application of process mining algorithms. After extraction we would return a dataframe with extracted event types.
Detailed Functions:
def __classify_event_type(activity_label):
"""Classify the event type for a given activity."""
messages = Prompt.objects.get(name="EVENT_TYPE_MESSAGES").text
messages.append({"role": "user", "content": activity_label})
event_type = u.query_gpt(messages)
return event_type
The __classify_event_type
method retrieves a message template, appends the activity label to be classified, queries the GPT model to determine the event type, and returns the classification.
For example:
activity: noticing fatigue and joint pains
would return Symptom Onset
, which we would use to enrich our dataframe.
File Path: tracex_project/extraction/logic/modules/module_event_type_classifier.py
Class: LocationExtractor
This is the module that extracts the location information from the patient journey to each activity. This means all activities are classified to the given locations "Home", "Hospital", "Doctors". After extraction we would return a dataframe with extracted locations.
Detailed Functions:
def __classify_location(activity_label: str) -> str:
"""Classify the location for a given activity."""
messages = Prompt.objects.get(name="LOCATION_MESSAGES").text
messages.append({"role": "user", "content": activity_label})
location = u.query_gpt(messages)
return location
The __classify_location method retrieves a message template, appends the activity label to be classified, queries the GPT model to determine the location, and returns the classification. This process ensures accurate and efficient classification of locations based on activity labels.
For example:
activity: noting improvement during follow-up appointment
would return Doctors
, which we would use to enrich our dataframe.
We can also measure metrics, but this modul do not return a data frame.
File path: tracex_project/extraction/logic/modules/module_metrics_analyzer.py
Class: MetricsAnalyzer
This is the module that runs metrics on the pipelines output. The specified metrics currently used are: - relevance of event information - correctness of timestamps
Detailed functions:
@staticmethod
def __rate_activity_relevance(activity: str, condition: str | None) -> str:
category_mapping = {
"No Relevance": 0,
"Low Relevance": 1,
"Moderate Relevance": 2,
"High Relevance": 3,
}
messages = Prompt.objects.get(name="METRIC_ACTIVITY_MESSAGES").text
if condition is not None:
messages.append(
{
"role": "user",
"content": activity
+ "\n\nRate the activity relevance in the context of the course of disease: "
+ condition,
}
)
else:
messages.append({"role": "user", "content": activity})
response = u.query_gpt(messages)
category = "No Relevance" # By default, an activity is not relevant.
for key in category_mapping:
if key in response:
category = key
break
return category
The __rate_activity_relevance
method is a static method designed to rate the relevance of a given activity in the context of a specified condition. It assigns a relevance category based on the response from a GPT model. The method begins by defining a dictionary that maps relevance categories to numerical values. It then initializes the category as "No Relevance" and checks the response to see if any predefined relevance categories are mentioned. If a category is found in the response, it is assigned as the relevance category.
For example:
While examining a patient journey where the condition is having Covid-19.
activity: noticing fatigue and joint pains
would return High Relevance
and activity: painting a picture
would return No Relevance
The __rate_timestamps_correctness
method evaluates the correctness of start and end timestamps for a given activity within a patient's journey.
def __rate_timestamps_correctness(
self, activity: str, start, end
) -> Tuple[str, float]:
messages = Prompt.objects.get(name="METRIC_TIMESTAMP_MESSAGES").text
messages.append(
{
"role": "user",
"content": (
f"Text: {self.patient_journey}\nActivity: {activity}\n\
Start date: {start}\nEnd date: {end}\n"
),
}
)
timestamp_correctness, linear_probability = u.query_gpt(
messages, return_linear_probability=True, top_logprobs=1
)
return timestamp_correctness, linear_probability
This method retrieves the activity, the timestamps, and the entire patient journey. It then queries the GPT model for an evaluation and returns the correctness assessment (true or false) along with the probability of how confident the model is with the answer.
For example:
-
Activity:
I went to the doctor on the 1st of January.
-
Start date:
01/01/2024
-
End date:
01/01/2024
-
Patient journey:
I went to the doctor on the 1st of January. And was diagnosed me with Covid-19.
This would return: true, 1