diff --git a/pipeline_lib/core/pipeline.py b/pipeline_lib/core/pipeline.py index 3fe6a2c..41fc377 100644 --- a/pipeline_lib/core/pipeline.py +++ b/pipeline_lib/core/pipeline.py @@ -39,39 +39,53 @@ def __init__( columns_to_ignore_for_training: Optional[list[str]] = None, tracking: Optional[dict] = None, ): - self.data = DataContainer() self.steps = [] self.save_data_path = save_data_path - self.data.target = target - self.data.prediction_column = f"{target}_prediction" - self.data.columns_to_ignore_for_training = columns_to_ignore_for_training or [] + self.target = target + self.prediction_column = f"{target}_prediction" + self.columns_to_ignore_for_training = columns_to_ignore_for_training or [] self.tracking = tracking or {} self.config = {} + def _initialize_data(self) -> DataContainer: + """Initialize the data container.""" + data = DataContainer() + data.target = self.target + data.prediction_column = self.prediction_column + data.columns_to_ignore_for_training = self.columns_to_ignore_for_training + return data + def add_steps(self, steps: list[PipelineStep]): """Add steps to the pipeline.""" self.steps.extend(steps) def run(self, is_train: bool, df: Optional[pd.DataFrame] = None) -> DataContainer: """Run the pipeline on the given data.""" + data = self._initialize_data() if is_train: steps_to_run = [step for step in self.steps if step.used_for_training] self.logger.info("Training the pipeline") else: - self.data.update(DataContainer.from_pickle(self.save_data_path)) + loaded_data = DataContainer.from_pickle(self.save_data_path) + if loaded_data is None: + raise ValueError( + f"Failed to load data from the pickle file ({self.save_data_path})." + ) + data.update(loaded_data) if df is not None: - self.data.raw = df + data.raw = df steps_to_run = [step for step in self.steps if step.used_for_prediction] self.logger.info("Predicting with the pipeline") - self.data.is_train = is_train + data.is_train = is_train + for i, step in enumerate(steps_to_run): start_time = time.time() log_str = f"Running {step.__class__.__name__} - {i + 1} / {len(steps_to_run)}" Pipeline.logger.info(log_str) - data = step.execute(self.data) + data = step.execute(data) Pipeline.logger.info(f"{log_str} done. Took: {time.time() - start_time:.2f}s") diff --git a/pipeline_lib/core/steps/generate.py b/pipeline_lib/core/steps/generate.py index f3536bc..5d58901 100644 --- a/pipeline_lib/core/steps/generate.py +++ b/pipeline_lib/core/steps/generate.py @@ -127,10 +127,12 @@ def execute(self, data: DataContainer) -> DataContainer: ) for key, value in data._generate_step_dtypes.items(): try: - if key in df.columns: + if key == data.target: + # Skip the target column since it's not in the prediction dataframe + continue + elif key in df.columns: df[key] = df[key].astype(value) - elif key != data.target: - # Target column may not be in the prediction dataframe + else: raise ValueError( f"Column {key} from training schema not found in DataFrame" ) diff --git a/pipeline_lib/utils/compression_utils.py b/pipeline_lib/utils/compression_utils.py index eaca304..8ec81ae 100644 --- a/pipeline_lib/utils/compression_utils.py +++ b/pipeline_lib/utils/compression_utils.py @@ -56,4 +56,4 @@ def decompress_zipfile(filename: str): The name of the .zip file to be decompressed, without the .zip extension """ with zipfile.ZipFile(filename + ".zip", "r") as zip_file: - zip_file.extractall() + zip_file.extractall(path=os.path.dirname(filename)) diff --git a/tests/core/steps/test_encoding.py b/tests/core/steps/test_encoding.py index 546899f..6684188 100644 --- a/tests/core/steps/test_encoding.py +++ b/tests/core/steps/test_encoding.py @@ -33,22 +33,6 @@ def train_data_container(train_data: pd.DataFrame) -> DataContainer: return data -def test_check_dtypes(train_data_container: DataContainer): - """Test to check data types after encoding.""" - encode_step = EncodeStep() - result = encode_step.execute(train_data_container) - - assert isinstance(result, DataContainer) - assert result.X_train.shape == (8, 6) - assert result.y_train.shape == (8,) - assert result.X_train["year"].dtype == np.dtype("int64") - assert result.X_train["month"].dtype == np.dtype("int64") - assert result.X_train["day"].dtype == np.dtype("int64") - assert result.X_train["numeric"].dtype == np.dtype("int64") - assert result.X_train["category_low"].dtype == np.dtype("uint8") # optimizing int64 to uint8 - assert result.X_train["category_high"].dtype == np.dtype("float32") # optimizing to float32 - - def test_check_numeric_passthrough(train_data_container: DataContainer): """Test to check if numeric columns are correctly passed through.""" encode_step = EncodeStep() diff --git a/tests/core/test_pipeline.py b/tests/core/test_pipeline.py new file mode 100644 index 0000000..7e6b277 --- /dev/null +++ b/tests/core/test_pipeline.py @@ -0,0 +1,51 @@ +import os + +import pytest + +import pipeline_lib + + +def test_simple_train_pipeline(): + """Test that the pipeline can be trained.""" + pipeline = pipeline_lib.Pipeline.from_json("tests/data/test.json") + pipeline.train() + + zip_file = pipeline.save_data_path + ".zip" + os.remove(zip_file) + + +def test_simple_predict_pipeline(): + """Test that the pipeline can be trained and predicted.""" + pipeline = pipeline_lib.Pipeline.from_json("tests/data/test.json") + pipeline.train() + pipeline.predict() + + zip_file = pipeline.save_data_path + ".zip" + os.remove(zip_file) + + +def test_predict_raises_error_with_no_predict_path_and_df(): + """Test that the pipeline raises an error when no predict path is provided + and no dataframe is provided.""" + pipeline = pipeline_lib.Pipeline.from_json("tests/data/test_without_predict_path.json") + pipeline.train() + + # check that this raies an error + with pytest.raises(ValueError): + try: + pipeline.predict() + finally: + zip_file = pipeline.save_data_path + ".zip" + if os.path.exists(zip_file): + os.remove(zip_file) + + +def test_predict_with_df(): + pipeline = pipeline_lib.Pipeline.from_json("tests/data/test_without_predict_path.json") + data = pipeline.train() + + df = data.raw.drop(columns=[pipeline.target]) + pipeline.predict(df) + + zip_file = pipeline.save_data_path + ".zip" + os.remove(zip_file) diff --git a/tests/data/test.json b/tests/data/test.json new file mode 100644 index 0000000..f6d8d69 --- /dev/null +++ b/tests/data/test.json @@ -0,0 +1,69 @@ +{ + "pipeline": { + "name": "XGBoostTrainingPipeline", + "description": "Training pipeline for XGBoost models.", + "parameters": { + "save_data_path": "tests/data/test.pkl", + "target": "target" + }, + "steps": [ + { + "step_type": "GenerateStep", + "parameters": { + "train_path": "tests/data/train.csv", + "test_path": "tests/data/test.csv", + "predict_path": "tests/data/predict.csv" + } + }, + { + "step_type": "TabularSplitStep", + "parameters": { + "train_percentage": 0.8 + } + }, + { + "step_type": "CleanStep" + }, + { + "step_type": "CalculateFeaturesStep", + "parameters": { + "datetime_columns": "date", + "features": [ + "year", + "month", + "day" + ] + } + }, + { + "step_type": "EncodeStep", + "parameters": { + "feature_encoders": { + "category_high": { + "encoder": "TargetEncoder" + } + } + } + }, + { + "step_type": "ModelStep", + "parameters": { + "model_class": "XGBoost", + "model_parameters": { + "n_estimators": 3, + "max_depth": 3 + } + } + }, + { + "step_type": "CalculateMetricsStep" + }, + { + "step_type": "ExplainerDashboardStep", + "parameters": { + "enable_step": false + } + } + ] + } +} \ No newline at end of file diff --git a/tests/data/test_without_predict_path.json b/tests/data/test_without_predict_path.json new file mode 100644 index 0000000..1f876d3 --- /dev/null +++ b/tests/data/test_without_predict_path.json @@ -0,0 +1,68 @@ +{ + "pipeline": { + "name": "XGBoostTrainingPipeline", + "description": "Training pipeline for XGBoost models.", + "parameters": { + "save_data_path": "tests/data/test_2.pkl", + "target": "target" + }, + "steps": [ + { + "step_type": "GenerateStep", + "parameters": { + "train_path": "tests/data/train.csv", + "test_path": "tests/data/test.csv" + } + }, + { + "step_type": "TabularSplitStep", + "parameters": { + "train_percentage": 0.8 + } + }, + { + "step_type": "CleanStep" + }, + { + "step_type": "CalculateFeaturesStep", + "parameters": { + "datetime_columns": "date", + "features": [ + "year", + "month", + "day" + ] + } + }, + { + "step_type": "EncodeStep", + "parameters": { + "feature_encoders": { + "category_high": { + "encoder": "TargetEncoder" + } + } + } + }, + { + "step_type": "ModelStep", + "parameters": { + "model_class": "XGBoost", + "model_parameters": { + "n_estimators": 3, + "max_depth": 3 + } + } + }, + { + "step_type": "CalculateMetricsStep" + }, + { + "step_type": "ExplainerDashboardStep", + "parameters": { + "enable_step": false + } + } + ] + } +} \ No newline at end of file