Skip to content

Commit

Permalink
add pipeline test and fix important errors
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarvid committed May 19, 2024
1 parent 09481f3 commit acaee31
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 28 deletions.
30 changes: 22 additions & 8 deletions pipeline_lib/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,53 @@ def __init__(
columns_to_ignore_for_training: Optional[list[str]] = None,
tracking: Optional[dict] = None,
):
self.data = DataContainer()
self.steps = []
self.save_data_path = save_data_path
self.data.target = target
self.data.prediction_column = f"{target}_prediction"
self.data.columns_to_ignore_for_training = columns_to_ignore_for_training or []
self.target = target
self.prediction_column = f"{target}_prediction"
self.columns_to_ignore_for_training = columns_to_ignore_for_training or []
self.tracking = tracking or {}
self.config = {}

def _initialize_data(self) -> DataContainer:
"""Initialize the data container."""
data = DataContainer()
data.target = self.target
data.prediction_column = self.prediction_column
data.columns_to_ignore_for_training = self.columns_to_ignore_for_training
return data

def add_steps(self, steps: list[PipelineStep]):
"""Add steps to the pipeline."""
self.steps.extend(steps)

def run(self, is_train: bool, df: Optional[pd.DataFrame] = None) -> DataContainer:
"""Run the pipeline on the given data."""
data = self._initialize_data()

if is_train:
steps_to_run = [step for step in self.steps if step.used_for_training]
self.logger.info("Training the pipeline")
else:
self.data.update(DataContainer.from_pickle(self.save_data_path))
loaded_data = DataContainer.from_pickle(self.save_data_path)
if loaded_data is None:
raise ValueError(
f"Failed to load data from the pickle file ({self.save_data_path})."
)
data.update(loaded_data)
if df is not None:
self.data.raw = df
data.raw = df
steps_to_run = [step for step in self.steps if step.used_for_prediction]
self.logger.info("Predicting with the pipeline")

self.data.is_train = is_train
data.is_train = is_train

for i, step in enumerate(steps_to_run):
start_time = time.time()
log_str = f"Running {step.__class__.__name__} - {i + 1} / {len(steps_to_run)}"
Pipeline.logger.info(log_str)

data = step.execute(self.data)
data = step.execute(data)

Pipeline.logger.info(f"{log_str} done. Took: {time.time() - start_time:.2f}s")

Expand Down
8 changes: 5 additions & 3 deletions pipeline_lib/core/steps/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,12 @@ def execute(self, data: DataContainer) -> DataContainer:
)
for key, value in data._generate_step_dtypes.items():
try:
if key in df.columns:
if key == data.target:
# Skip the target column since it's not in the prediction dataframe
continue
elif key in df.columns:
df[key] = df[key].astype(value)
elif key != data.target:
# Target column may not be in the prediction dataframe
else:
raise ValueError(
f"Column {key} from training schema not found in DataFrame"
)
Expand Down
2 changes: 1 addition & 1 deletion pipeline_lib/utils/compression_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ def decompress_zipfile(filename: str):
The name of the .zip file to be decompressed, without the .zip extension
"""
with zipfile.ZipFile(filename + ".zip", "r") as zip_file:
zip_file.extractall()
zip_file.extractall(path=os.path.dirname(filename))
16 changes: 0 additions & 16 deletions tests/core/steps/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,6 @@ def train_data_container(train_data: pd.DataFrame) -> DataContainer:
return data


def test_check_dtypes(train_data_container: DataContainer):
"""Test to check data types after encoding."""
encode_step = EncodeStep()
result = encode_step.execute(train_data_container)

assert isinstance(result, DataContainer)
assert result.X_train.shape == (8, 6)
assert result.y_train.shape == (8,)
assert result.X_train["year"].dtype == np.dtype("int64")
assert result.X_train["month"].dtype == np.dtype("int64")
assert result.X_train["day"].dtype == np.dtype("int64")
assert result.X_train["numeric"].dtype == np.dtype("int64")
assert result.X_train["category_low"].dtype == np.dtype("uint8") # optimizing int64 to uint8
assert result.X_train["category_high"].dtype == np.dtype("float32") # optimizing to float32


def test_check_numeric_passthrough(train_data_container: DataContainer):
"""Test to check if numeric columns are correctly passed through."""
encode_step = EncodeStep()
Expand Down
51 changes: 51 additions & 0 deletions tests/core/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os

import pytest

import pipeline_lib


def test_simple_train_pipeline():
"""Test that the pipeline can be trained."""
pipeline = pipeline_lib.Pipeline.from_json("tests/data/test.json")
pipeline.train()

zip_file = pipeline.save_data_path + ".zip"
os.remove(zip_file)


def test_simple_predict_pipeline():
"""Test that the pipeline can be trained and predicted."""
pipeline = pipeline_lib.Pipeline.from_json("tests/data/test.json")
pipeline.train()
pipeline.predict()

zip_file = pipeline.save_data_path + ".zip"
os.remove(zip_file)


def test_predict_raises_error_with_no_predict_path_and_df():
"""Test that the pipeline raises an error when no predict path is provided
and no dataframe is provided."""
pipeline = pipeline_lib.Pipeline.from_json("tests/data/test_without_predict_path.json")
pipeline.train()

# check that this raies an error
with pytest.raises(ValueError):
try:
pipeline.predict()
finally:
zip_file = pipeline.save_data_path + ".zip"
if os.path.exists(zip_file):
os.remove(zip_file)


def test_predict_with_df():
pipeline = pipeline_lib.Pipeline.from_json("tests/data/test_without_predict_path.json")
data = pipeline.train()

df = data.raw.drop(columns=[pipeline.target])
pipeline.predict(df)

zip_file = pipeline.save_data_path + ".zip"
os.remove(zip_file)
69 changes: 69 additions & 0 deletions tests/data/test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"pipeline": {
"name": "XGBoostTrainingPipeline",
"description": "Training pipeline for XGBoost models.",
"parameters": {
"save_data_path": "tests/data/test.pkl",
"target": "target"
},
"steps": [
{
"step_type": "GenerateStep",
"parameters": {
"train_path": "tests/data/train.csv",
"test_path": "tests/data/test.csv",
"predict_path": "tests/data/predict.csv"
}
},
{
"step_type": "TabularSplitStep",
"parameters": {
"train_percentage": 0.8
}
},
{
"step_type": "CleanStep"
},
{
"step_type": "CalculateFeaturesStep",
"parameters": {
"datetime_columns": "date",
"features": [
"year",
"month",
"day"
]
}
},
{
"step_type": "EncodeStep",
"parameters": {
"feature_encoders": {
"category_high": {
"encoder": "TargetEncoder"
}
}
}
},
{
"step_type": "ModelStep",
"parameters": {
"model_class": "XGBoost",
"model_parameters": {
"n_estimators": 3,
"max_depth": 3
}
}
},
{
"step_type": "CalculateMetricsStep"
},
{
"step_type": "ExplainerDashboardStep",
"parameters": {
"enable_step": false
}
}
]
}
}
68 changes: 68 additions & 0 deletions tests/data/test_without_predict_path.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"pipeline": {
"name": "XGBoostTrainingPipeline",
"description": "Training pipeline for XGBoost models.",
"parameters": {
"save_data_path": "tests/data/test_2.pkl",
"target": "target"
},
"steps": [
{
"step_type": "GenerateStep",
"parameters": {
"train_path": "tests/data/train.csv",
"test_path": "tests/data/test.csv"
}
},
{
"step_type": "TabularSplitStep",
"parameters": {
"train_percentage": 0.8
}
},
{
"step_type": "CleanStep"
},
{
"step_type": "CalculateFeaturesStep",
"parameters": {
"datetime_columns": "date",
"features": [
"year",
"month",
"day"
]
}
},
{
"step_type": "EncodeStep",
"parameters": {
"feature_encoders": {
"category_high": {
"encoder": "TargetEncoder"
}
}
}
},
{
"step_type": "ModelStep",
"parameters": {
"model_class": "XGBoost",
"model_parameters": {
"n_estimators": 3,
"max_depth": 3
}
}
},
{
"step_type": "CalculateMetricsStep"
},
{
"step_type": "ExplainerDashboardStep",
"parameters": {
"enable_step": false
}
}
]
}
}

0 comments on commit acaee31

Please sign in to comment.