Skip to content

Pipelines

Thang edited this page May 31, 2024 · 54 revisions

Introduction

This section describes the pipeline for extracting event logs. Furthermore, it explains the functionality of the individual modules within the pipeline. These modules are located in the tracex_project/extraction/logic/modules directory and are crucial for processing and analyzing the data.

The pipeline for extracting event logs consists of several steps, from identifying the data sources to saving the extracted data. This section provides a detailed and understandable description of the individual modules used to extract and process event logs. It includes an explanation of the functionality of each module and its use within the pipeline.

Overview

image

Patient Journey Generator Pipeline

The Patient Journey Generation Pipeline is a framework for generating synthetic COVID-19 patient journeys.

generation

Persona Generation

The pipeline randomizes key attributes to build a unique persona for each generated patient journey:

  • Sex: Male or female
  • Nationality: Randomly selected from a predefined list of countries
  • Key dates: Chosen to establish a timeline for the persona's journey

These randomized elements are combined to produce a short biography of the persona, providing context for the patient journey.

Patient Journey Creation

Using the crafted persona and biography as a foundation, the system then generates a synthetic patient journey specific to the COVID-19 pandemic. The journey incorporates the persona's attributes and follows a COVID-19 disease progression and interaction with healthcare actors. The generated patient journeys can be directly used as an input for the extraction pipeline.

Preprocessing Pipeline:

This pipeline is ultimately used to perform data cleaning, ensuring that the input data is accurate, consistent, and well-structured, which is crucial for reliable downstream analysis and processing.

preprocessor

The steps in our preprocessor pipeline inlucding

  1. Check Spelling: Corrects spelling errors to ensure textual data is accurate.
  2. Check Punctuation: Ensures proper punctuation to improve the readability and structure of the text.
  3. Identify Timestamps: Detects and marks timestamps within the text for further processing.
  4. Transform Timestamps: Converts timestamps into a standardized format.
  5. Interpret Timestamps: Understands the context and significance of the timestamps.
  6. Calculate Timestamps: Computes relevant time-based metrics from the timestamps.

Preprocessing Patient Journey

File Path: tracex_project/extraction/logic/modules/module_patient_journey_preprocessor.py

Class: Preprocessor

This class provides functions for preprocessing the patient input to enhance data quality and interpretability. We do each preprocessing step with queries to gpt.

These steps include spellcheck, punctuation correction, and various time-related identifications and adjustments. Each step is applied by calling the private method __apply_preprocessing_step

@staticmethod
def __apply_preprocessing_step(text: str, prompt_name: str) -> str:
    """Applies a preprocessing step based on the step name."""
    messages = Prompt.objects.get(name=f"PREPROCESSING_{prompt_name}").text
    new_user_message = {"role": "user", "content": text}
    messages.append(new_user_message)
    preprocessed_text = u.query_gpt(messages)

    return preprocessed_text

For example giving the Patient Journey

I (30f) started noticing unusual fatigue, and joint pains in early Febuary 2023. I initially brushed it off as overwork since I always had something with my heart, However as the symptons persisted, I desided to see a doctor in Sevenoaks two weeks later. Blood tests were conducted and exactly one month after my initial visit, I was diagnosed with rhumatoid arthritis. Following the diagnosis my treatment began on April, 13, focusing on managing the symptoms. A significant improvement was noted during a follow-up appoinment three months after the start of the treatment.

The original patient journey is often filled with grammar and spelling errors and difficult-to-read timestamps, making it much harder for the LLM to understand. By transforming the patient journey using GPT, we obtain a preprocessed version that is easier to read and more suitable for extracting the event log with our extraction pipeline.

Checking for spelling and punctuation is just a query to the GPT model; however, dealing with timestamps is a bigger challenge.

Our Patient Journey after the spelling and checking would look like this:

I (30f) started noticing unusual fatigue, and joint pain in early February 2023. I initially brushed it off as overwork since I always had something with my heart, However as the symptoms persisted, I decided to see a doctor in Sevenoaks two weeks later. Blood tests were conducted and exactly one month after my initial visit, I was diagnosed with rheumatoid arthritis. Following the diagnosis my treatment began on April, 13, focusing on managing the symptoms. A significant improvement was noted during a follow-up appointment three months after the start of the treatment.

However, to transform certain timestamps, for example, changing

I (30f) started noticing unusual fatigue and joint pain in early February 2023 to I (30f) started noticing unusual fatigue and joint pain on 2023/02/01. we use several steps performed one after another.

We need to identify the timestamps in the patient journey and mark them for further transformation. We are doing this because GPT, as of now (2024/05/31), is better at performing one step at a time.

For e.g.

I (30f) started noticing unusual fatigue, and joint pain $$$in early February 2023$$$.

We would transform the timestamp marked with $$$. This ensures that the model works more efficiently because we have divided the steps. So that we would get I (30f) started noticing unusual fatigue and joint pain on 2023/02/01.

A further challenge in real-world patient journeys is that people also write timestamps in relation to other events.

For example: I went to the hospital on 2023/04/02. After 3 weeks, I finally could go home.

Since we want our extraction pipeline to have an easier time later on, it's better to transform these related timestamps as well. That is why we included prompts to calculate those timestamps.

So our output after Calculate Timestamps would be

I went to the hospital on 2023/04/02. And on the 2023/04/23, I finally could go home.

Extraction Pipeline:

This pipeline is ultimately used to perform the extraction of the patient journey.

image

The main steps of the pipeline include:

  1. Extracting cohort information
  2. Extracting activity labels
  3. Adding start dates, end dates, and durations (optional)
  4. Adding event types (optional)
  5. Adding locations (optional)

Additionally, we can also measure metrics.

If we for example don't choose to extract the timestamps, they would be filled with N/A in the event log.

Module description

Extract cohort tags

File path: tracex_project/extraction/logic/modules/module_cohort_tagger.py

Class: CohortTagger

This is the module that extracts the cohort information from the patient journey. The cohort tags are condition, age, biological sex, origin and preexisting condition.

Detailed functions:

def __extract_cohort_tags(patient_journey) -> Dict[str, str]:
    """Extracts information about condition, sex, age, origin and preexisting condition."""
    cohort_data = {}
    for message_list in Prompt.objects.get(name="COHORT_TAG_MESSAGES").text:
        messages = message_list[1:]
        messages.append(
            {"role": "user", "content": patient_journey},
        )
        tag = u.query_gpt(messages)
        cohort_data[message_list[0]] = tag

    return cohort_data

Example: Giving a Patient Journey.

Our output could look like this:

{'condition': 'Rheumatoid arthritis', 'sex': 'female', 'age': '30', 'origin': 'United Kingdom', 'preexisting_condition': 'Heart disease'}

If we find nothing in a patient journey for a cohort, we would populate this cohort with N/A. A function named __remove_placeholder was written for this purpose. Assuming the above patient journey, we would not have found any pre-existing conditions in the text.

Then our output would look like this:

{'condition': 'Rheumatoid arthritis', 'sex': 'female', 'age': '30', 'origin': 'United Kingdom', 'preexisting_condition': 'N/A'}

If we could not find a cohort in the patient journey, we would return none in this case.


Extract activity labels

File Path: tracex_project/extraction/logic/modules/module_activity_labeler.py

Class: ActivityLabeler

This module extracts activity labels from the data to identify specific activities within the event logs. It returns the resulting DataFrame, which contains the extracted activity labels.


Add Start Dates, End Dates, and Durations

File Path: tracex_project/extraction/logic/modules/module_time_extractor.py

Class: TimeExtractor

This module extracts time information from the patient journey, including start dates, end dates, and durations. It returns the resulting DataFrame, which contains the extracted start, end, and duration times in the correct format.

Detailed Functions:

def __extract_start_date(self, row: pd.Series) -> str:
    """Extract the start date for a given activity."""
    lower, upper = u.get_snippet_bounds(
        index=(int(row["sentence_id"])), length=len(self.patient_journey_sentences)
    )
    patient_journey_snippet = ". ".join(self.patient_journey_sentences[lower:upper])
    messages = Prompt.objects.get(name="START_DATE_MESSAGES").text
    messages.append(
        {
            "role": "user",
            "content": "Text: "
            + patient_journey_snippet
            + "\nActivity label: "
            + row["activity"],
        }
    )
    start = u.query_gpt(messages)

    return start

The method determines the bounds of a text snippet surrounding the sentence of interest by using the sentence_id from the row and the total number of sentences in the patient's journey. With these bounds, it creates a snippet of text that includes relevant context around the activity.

We then use the snippet to query a GPT model, which processes the context and extracts the start date for the activity. Finally, the method returns the start date obtained from the GPT model.

For example, for the given patient journey above:

For the activity noticing fatigue and joint pains, we would return 20230201T0000.

The end date is also extracted using the same principle.

The calculate_duration method is a static method that calculates the duration of an activity based on its start and end timestamps. It operates on a row from a DataFrame, which contains the timestamps for each activity.

@staticmethod
def __calculate_duration(row: pd.Series) -> str:
    """Calculate the duration of an activity."""
    duration = row["time:end_timestamp"] - row["time:timestamp"]
    hours, remainder = divmod(duration.total_seconds(), 3600)
    minutes, seconds = divmod(remainder, 60)

    return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"

The method computes the duration by subtracting the start timestamp (time:timestamp) from the end timestamp (time:end_timestamp). This gives a timedelta object representing the duration of the activity.

Next, the method converts this duration into total seconds and uses the divmod function to break it down into hours, minutes, and seconds. The divmod function first divides the total seconds by 3600 (the number of seconds in an hour) to get the hours and the remainder of seconds. Then, it divides the remainder by 60 to get the minutes and seconds.

Finally, the method returns the duration formatted as a string in the "HH:MM " format, ensuring that each unit is always represented by two digits for consistency and readability.

The __post_processing method is designed to clean and fill missing values in the timestamps within a DataFrame. This method ensures that all date columns are properly formatted and any gaps in the data are handled appropriately.

The method defines several helper functions to carry out specific tasks:

Convert to Datetime:

def convert_to_datetime(df: pd.DataFrame, column: pd.Series) -> pd.DataFrame:
    df[column] = pd.to_datetime(df[column], format="%Y%m%dT%H%M", errors="coerce")
    return df

This function converts the specified column in the DataFrame to datetime format. It uses a specific format and coerces any errors that occur during the conversion.

Set Default Date if NA:

def set_default_date_if_na(df: pd.DataFrame, column: pd.Series) -> pd.DataFrame:
    if df[column].isna().all():
        df[column] = df[column].fillna(pd.Timestamp("2020-01-01 00:00"))
    return df

This function sets a default date of "2020-01-01 00:00" for the entire column if all values in the column are missing (NA).

Fill Missing Values:

def fill_missing_values(df: pd.DataFrame, column: pd.Series) -> pd.DataFrame:
    df[column] = df[column].ffill().bfill()
    return df

This function fills missing values in the column by carrying forward the next valid observation (forward fill) and then carrying backward the last valid observation (backward fill).

Fix End Dates:

def fix_end_dates(row: pd.Series) -> pd.Series:
    if row["time:end_timestamp"] is pd.NaT and row["time:timestamp"] is not pd.NaT:
        row["time:end_timestamp"] = row["time:timestamp"]
    return row

This function ensures that if the end timestamp is missing but the start timestamp is present, the end timestamp is set to be the same as the start timestamp.

In summary, the __post_processing method systematically ensures that all timestamps in the DataFrame are correctly formatted, default values are set where necessary, and any remaining missing values are filled, resulting in a complete and consistent dataset.


Add event types

File Path: tracex_project/extraction/logic/modules/module_event_type_classifier.py

Class: EventTypeClassifier

This module classifies the event types of the activities. The given event types are 'Symptom Onset', 'Symptom Offset', 'Diagnosis', 'Doctor visit', 'Treatment', 'Hospital admission', 'Hospital discharge', 'Medication', 'Lifestyle Change' and 'Feelings'. This is done so that we can extract a standardized set of event types from the patient journey. This is necessary for the application of process mining algorithms. After extraction we would return a dataframe with extracted event types.

Detailed Functions:

def __classify_event_type(activity_label):
    """Classify the event type for a given activity."""
    messages = Prompt.objects.get(name="EVENT_TYPE_MESSAGES").text
    messages.append({"role": "user", "content": activity_label})
    event_type = u.query_gpt(messages)

    return event_type

The __classify_event_type method retrieves a message template, appends the activity label to be classified, queries the GPT model to determine the event type, and returns the classification.

For example:

activity: noticing fatigue and joint pains would return Symptom Onset, which we would use to enrich our dataframe.


Add locations

File Path: tracex_project/extraction/logic/modules/module_event_type_classifier.py

Class: LocationExtractor

This is the module that extracts the location information from the patient journey to each activity. This means all activities are classified to the given locations "Home", "Hospital", "Doctors". After extraction we would return a dataframe with extracted locations.

Detailed Functions:

def __classify_location(activity_label: str) -> str:
    """Classify the location for a given activity."""
    messages = Prompt.objects.get(name="LOCATION_MESSAGES").text
    messages.append({"role": "user", "content": activity_label})
    location = u.query_gpt(messages)

    return location

The __classify_location method retrieves a message template, appends the activity label to be classified, queries the GPT model to determine the location, and returns the classification. This process ensures accurate and efficient classification of locations based on activity labels.

For example:

activity: noting improvement during follow-up appointment would return Doctors, which we would use to enrich our dataframe.


We can also measure metrics, but this modul do not return a data frame.

Measure metrics

File path: tracex_project/extraction/logic/modules/module_metrics_analyzer.py

Class: MetricsAnalyzer

This is the module that runs metrics on the pipelines output. The specified metrics currently used are: - relevance of event information - correctness of timestamps

Detailed functions:

 @staticmethod
 def __rate_activity_relevance(activity: str, condition: str | None) -> str:
     category_mapping = {
         "No Relevance": 0,
         "Low Relevance": 1,
         "Moderate Relevance": 2,
         "High Relevance": 3,
     }

     messages = Prompt.objects.get(name="METRIC_ACTIVITY_MESSAGES").text
     if condition is not None:
         messages.append(
             {
                 "role": "user",
                 "content": activity
                 + "\n\nRate the activity relevance in the context of the course of disease: "
                 + condition,
             }
         )
     else:
         messages.append({"role": "user", "content": activity})

     response = u.query_gpt(messages)
     category = "No Relevance"  # By default, an activity is not relevant.
     for key in category_mapping:
         if key in response:
             category = key
             break

     return category

The __rate_activity_relevance method is a static method designed to rate the relevance of a given activity in the context of a specified condition. It assigns a relevance category based on the response from a GPT model. The method begins by defining a dictionary that maps relevance categories to numerical values. It then initializes the category as "No Relevance" and checks the response to see if any predefined relevance categories are mentioned. If a category is found in the response, it is assigned as the relevance category.

For example:

While examining a patient journey where the condition is having Covid-19.

activity: noticing fatigue and joint pains would return High Relevance and activity: painting a picture would return No Relevance

The __rate_timestamps_correctness method evaluates the correctness of start and end timestamps for a given activity within a patient's journey.

def __rate_timestamps_correctness(
    self, activity: str, start, end
) -> Tuple[str, float]:
    messages = Prompt.objects.get(name="METRIC_TIMESTAMP_MESSAGES").text
    messages.append(
        {
            "role": "user",
            "content": (
                f"Text: {self.patient_journey}\nActivity: {activity}\n\
                Start date: {start}\nEnd date: {end}\n"
            ),
        }
    )

    timestamp_correctness, linear_probability = u.query_gpt(
        messages, return_linear_probability=True, top_logprobs=1
    )

    return timestamp_correctness, linear_probability

This method retrieves the activity, the timestamps, and the entire patient journey. It then queries the GPT model for an evaluation and returns the correctness assessment (true or false) along with the probability of how confident the model is with the answer.

For example:

  • Activity: I went to the doctor on the 1st of January.
  • Start date: 01/01/2024
  • End date: 01/01/2024
  • Patient journey: I went to the doctor on the 1st of January. And was diagnosed me with Covid-19.

This would return: true, 1

Clone this wiki locally