From 955d98a2dffe106313f22a1f4598411e5e5cd209 Mon Sep 17 00:00:00 2001 From: ovejabu Date: Tue, 4 Jun 2024 17:57:04 -0300 Subject: [PATCH] Update docstrings --- pipeline_lib/core/data_container.py | 2 +- pipeline_lib/core/random_state_generator.py | 10 +- pipeline_lib/core/steps/calculate_features.py | 58 +++++- pipeline_lib/core/steps/calculate_metrics.py | 22 +++ pipeline_lib/core/steps/calculate_reports.py | 18 +- pipeline_lib/core/steps/clean.py | 100 +++++++++- pipeline_lib/core/steps/encode.py | 185 ++++++++++++++++-- .../core/steps/explainer_dashboard.py | 22 ++- pipeline_lib/core/steps/fit_model.py | 110 ++++++++++- pipeline_lib/core/steps/generate.py | 57 +++++- 10 files changed, 554 insertions(+), 30 deletions(-) diff --git a/pipeline_lib/core/data_container.py b/pipeline_lib/core/data_container.py index 3755e6a..246e29e 100644 --- a/pipeline_lib/core/data_container.py +++ b/pipeline_lib/core/data_container.py @@ -45,7 +45,7 @@ def update(self, other: DataContainer) -> None: Update the data in this container with another DataContainer's data. Parameters - ========= + ---------- other : DataContainer The DataContainer to copy data from. """ diff --git a/pipeline_lib/core/random_state_generator.py b/pipeline_lib/core/random_state_generator.py index f07caca..621684e 100644 --- a/pipeline_lib/core/random_state_generator.py +++ b/pipeline_lib/core/random_state_generator.py @@ -10,8 +10,9 @@ def get_random_state() -> Optional[RandomState]: """ Get the global random state object. - Returns: - RandomState or None: The global random state object if initialized, else None. + Returns + ---------- + RandomState or None: The global random state object if initialized, else None. """ global _random_state return _random_state @@ -21,8 +22,9 @@ def initialize_random_state(seed: int): """ Initialize the global random state object with the provided seed. - Args: - seed (int): The seed value to initialize the random state object. + Parameters + ---------- + seed (int): The seed value to initialize the random state object. """ global _random_state _random_state = np.random.RandomState(seed) diff --git a/pipeline_lib/core/steps/calculate_features.py b/pipeline_lib/core/steps/calculate_features.py index d9fe8be..1cab595 100644 --- a/pipeline_lib/core/steps/calculate_features.py +++ b/pipeline_lib/core/steps/calculate_features.py @@ -14,7 +14,7 @@ class UnsupportedFeatureError(Exception): class CalculateFeaturesStep(PipelineStep): - """Calculate features.""" + """Calculate datetime-related features from specified columns.""" used_for_prediction = True used_for_training = True @@ -24,7 +24,14 @@ def __init__( datetime_columns: Optional[Union[List[str], str]] = None, features: Optional[List[str]] = None, ) -> None: - """Initialize CalculateFeaturesStep.""" + """Initialize CalculateFeaturesStep. + + Parameters + ---------- + datetime_columns : Union[List[str], str], optional + The name of the column or columns containing datetime values, by default None + features : Optional[List[str]], optional + """ super().__init__() self.init_logger() self.datetime_columns = datetime_columns @@ -59,7 +66,18 @@ def __init__( ) def _convert_column_to_datetime(self, df: pd.DataFrame, column: str) -> pd.DataFrame: - """Convert a column to datetime.""" + """Convert a column to datetime. + Parameters + ---------- + df : pd.DataFrame + The DataFrame containing the column to convert + column : str + The name of the column to convert + Returns + ------- + pd.DataFrame + The DataFrame with the column converted to datetime + """ # Check if the column is already a datetime type if not is_datetime64_any_dtype(df[column]): try: @@ -78,7 +96,15 @@ def _convert_column_to_datetime(self, df: pd.DataFrame, column: str) -> pd.DataF return df def _extract_feature(self, df: pd.DataFrame, column: str, feature: str) -> None: - """Extract a single feature from a datetime column.""" + """Extract a single feature from a datetime column. + Parameters + ---------- + df : pd.DataFrame + The DataFrame containing the datetime column + column : str + The name of the datetime column + feature : str + """ extractor = self.feature_extractors[feature] feature_column = f"{column}_{feature}" @@ -97,7 +123,16 @@ def _extract_feature(self, df: pd.DataFrame, column: str, feature: str) -> None: raise ValueError(error_message) def execute(self, data: DataContainer) -> DataContainer: - """Execute the step.""" + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ self.logger.info("Calculating features") if not data.is_train: @@ -121,7 +156,18 @@ def execute(self, data: DataContainer) -> DataContainer: def _create_datetime_features( self, df: pd.DataFrame, log: Optional[bool] = False ) -> pd.DataFrame: - """Create datetime features.""" + """Create datetime features. + Parameters + ---------- + df : pd.DataFrame + The DataFrame containing the datetime columns + log : Optional[bool], optional + Whether to log warnings and errors, by default False + Returns + ------- + pd.DataFrame + The DataFrame with the datetime features added + """ created_features = [] if self.datetime_columns: diff --git a/pipeline_lib/core/steps/calculate_metrics.py b/pipeline_lib/core/steps/calculate_metrics.py index a025830..f624b90 100644 --- a/pipeline_lib/core/steps/calculate_metrics.py +++ b/pipeline_lib/core/steps/calculate_metrics.py @@ -20,6 +20,18 @@ def __init__(self) -> None: self.init_logger() def _calculate_metrics(self, true_values: pd.Series, predictions: pd.Series) -> dict: + """Calculate metrics. + Parameters + ---------- + true_values : pd.Series + True values + predictions : pd.Series + Predictions + Returns + ------- + dict + Metrics + """ mae = mean_absolute_error(true_values, predictions) rmse = np.sqrt(mean_squared_error(true_values, predictions)) r2 = r2_score(true_values, predictions) @@ -39,6 +51,16 @@ def _calculate_metrics(self, true_values: pd.Series, predictions: pd.Series) -> } def execute(self, data: DataContainer) -> DataContainer: + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ self.logger.debug("Starting metric calculation") metrics = {} diff --git a/pipeline_lib/core/steps/calculate_reports.py b/pipeline_lib/core/steps/calculate_reports.py index 3bca31f..4e6959f 100644 --- a/pipeline_lib/core/steps/calculate_reports.py +++ b/pipeline_lib/core/steps/calculate_reports.py @@ -16,12 +16,26 @@ class CalculateReportsStep(PipelineStep): used_for_training = True def __init__(self, max_samples: int = 1000) -> None: - """Initialize CalculateReportsStep.""" + """Initialize CalculateReportsStep. + Parameters + ---------- + max_samples : int, optional + Maximum number of samples to use for calculating SHAP values, by default 1000 + """ self.init_logger() self.max_samples = max_samples def execute(self, data: DataContainer) -> DataContainer: - """Execute the step.""" + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The data updated data container + """ self.logger.info("Calculating reports") model = data.model diff --git a/pipeline_lib/core/steps/clean.py b/pipeline_lib/core/steps/clean.py index b32eefb..d05a549 100644 --- a/pipeline_lib/core/steps/clean.py +++ b/pipeline_lib/core/steps/clean.py @@ -7,6 +7,8 @@ class CleanStep(PipelineStep): + """Clean tabular data.""" + used_for_prediction = True used_for_training = True @@ -19,6 +21,22 @@ def __init__( drop_ids: Optional[dict] = None, filter: Optional[dict] = None, ): + """Initialize CleanStep. + Parameters + ---------- + fill_missing : Optional[dict], optional + Dictionary containing column names and fill values, by default None + remove_outliers : Optional[dict], optional + Dictionary containing column names and outlier removal methods, by default None + convert_dtypes : Optional[dict], optional + Dictionary containing column names and data types, by default None + drop_na_columns : Optional[list], optional + List of column names to drop rows with missing values, by default None + drop_ids : Optional[dict], optional + Dictionary containing column names and IDs to drop, by default None + filter : Optional[dict], optional + Dictionary containing column names and filter conditions, by default None + """ self.init_logger() self.fill_missing = fill_missing self.remove_outliers = remove_outliers @@ -28,6 +46,16 @@ def __init__( self.filter = filter def execute(self, data: DataContainer) -> DataContainer: + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ self.logger.info("Cleaning tabular data...") if not data.is_train: @@ -46,8 +74,16 @@ def execute(self, data: DataContainer) -> DataContainer: return data def _clean_df(self, df: pd.DataFrame) -> pd.DataFrame: - """Clean the DataFrame.""" - + """Clean the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to clean + Returns + ------- + pd.DataFrame + The cleaned DataFrame + """ df = self._filter(df) df = self._remove_outliers(df) @@ -63,6 +99,16 @@ def _clean_df(self, df: pd.DataFrame) -> pd.DataFrame: return df def _filter(self, df: pd.DataFrame) -> pd.DataFrame: + """Filter the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to filter + Returns + ------- + pd.DataFrame + The filtered DataFrame + """ if self.filter: original_rows = len(df) for key, value in self.filter.items(): @@ -83,6 +129,16 @@ def _filter(self, df: pd.DataFrame) -> pd.DataFrame: return df def _remove_outliers(self, df: pd.DataFrame) -> pd.DataFrame: + """Remove outliers from the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to remove outliers from + Returns + ------- + pd.DataFrame + The DataFrame without outliers + """ if self.remove_outliers: for column, method in self.remove_outliers.items(): if column in df.columns: @@ -110,6 +166,16 @@ def _remove_outliers(self, df: pd.DataFrame) -> pd.DataFrame: return df def _fill_missing(self, df: pd.DataFrame) -> pd.DataFrame: + """Fill missing values in the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to fill missing values in + Returns + ------- + pd.DataFrame + The DataFrame with missing values filled + """ if self.fill_missing: for column, fill_value in self.fill_missing.items(): if column in df.columns: @@ -122,6 +188,16 @@ def _fill_missing(self, df: pd.DataFrame) -> pd.DataFrame: return df def _convert_dtypes(self, df: pd.DataFrame) -> pd.DataFrame: + """Convert column data types in the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to convert column data types in + Returns + ------- + pd.DataFrame + The DataFrame with converted column data types + """ if self.convert_dtypes: for column, dtype in self.convert_dtypes.items(): if column in df.columns: @@ -132,6 +208,16 @@ def _convert_dtypes(self, df: pd.DataFrame) -> pd.DataFrame: return df def _drop_na_columns(self, df: pd.DataFrame) -> pd.DataFrame: + """Drop rows with missing values in the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to drop rows with missing values in + Returns + ------- + pd.DataFrame + The DataFrame without rows with missing values + """ if self.drop_na_columns: for column in self.drop_na_columns: if column in df.columns: @@ -146,6 +232,16 @@ def _drop_na_columns(self, df: pd.DataFrame) -> pd.DataFrame: return df def _drop_ids(self, df: pd.DataFrame) -> pd.DataFrame: + """Drop rows with specific IDs in the DataFrame. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to drop rows with specific IDs in + Returns + ------- + pd.DataFrame + The DataFrame without rows with specific IDs + """ if self.drop_ids: for column, ids in self.drop_ids.items(): if column in df.columns: diff --git a/pipeline_lib/core/steps/encode.py b/pipeline_lib/core/steps/encode.py index 24ebe64..9349fed 100644 --- a/pipeline_lib/core/steps/encode.py +++ b/pipeline_lib/core/steps/encode.py @@ -35,13 +35,29 @@ def __init__( cardinality_threshold: int = 5, feature_encoders: Optional[dict] = None, ) -> None: - """Initialize EncodeStep.""" + """Initialize EncodeStep. + Parameters + ---------- + cardinality_threshold : int, optional + The threshold to determine low and high cardinality features, by default 5 + feature_encoders : Optional[dict], optional + A dictionary mapping feature names to encoder configurations, by default None + """ self.init_logger() self.cardinality_threshold = cardinality_threshold self.feature_encoders = feature_encoders or {} def execute(self, data: DataContainer) -> DataContainer: - """Execute the encoding step.""" + """Execute the encoding step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ self.logger.info("Encoding data") target_column_name = data.target @@ -108,7 +124,30 @@ def _apply_encoding( saved_encoder: Optional[ColumnTransformer] = None, log: Optional[bool] = False, ) -> Tuple[pd.DataFrame, Optional[pd.Series], Optional[ColumnTransformer]]: - """Apply the encoding to the data.""" + """Apply the encoding to the data. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to encode + target_column_name : str + The target column name + columns_to_ignore_for_training : List[str] + Columns to ignore for training + categorical_features : List[str] + Categorical features + numeric_features : List[str] + Numeric features + fit_encoders : bool, optional + Whether to fit the encoders, by default False + saved_encoder : Optional[ColumnTransformer], optional + The saved encoder, by default None + log : Optional[bool], optional + Whether to log information about the features, by default False + Returns + ------- + Tuple[pd.DataFrame, Optional[pd.Series], Optional[ColumnTransformer]] + The encoded data, the target column, and the encoder + """ if not fit_encoders and not saved_encoder: raise ValueError("saved_encoder must be provided when fit_encoders is False.") @@ -160,7 +199,18 @@ def _apply_encoding( def _get_feature_types( self, df: pd.DataFrame, target_column_name: str ) -> Tuple[List[str], List[str]]: - """Get categorical and numeric feature lists.""" + """Get categorical and numeric feature lists. + Parameters + ---------- + df : pd.DataFrame + The DataFrame + target_column_name : str + The target column name + Returns + ------- + Tuple[List[str], List[str]] + Categorical and numeric features + """ categorical_features = [ col for col in df.columns @@ -177,7 +227,18 @@ def _get_feature_types( def _split_categorical_features( self, df: pd.DataFrame, categorical_features: List[str] ) -> Tuple[List[str], List[str]]: - """Split categorical features into low and high cardinality features.""" + """Split categorical features into low and high cardinality features. + Parameters + ---------- + df : pd.DataFrame + The DataFrame + categorical_features : List[str] + Categorical features + Returns + ------- + Tuple[List[str], List[str]] + Low and high cardinality features + """ low_cardinality_features = [ col for col in categorical_features if df[col].nunique() <= self.cardinality_threshold ] @@ -189,7 +250,16 @@ def _split_categorical_features( def _get_encoder_class_and_params( self, encoder_name: str ) -> Tuple[Union[Type[OrdinalEncoder], Type[TargetEncoder]], dict[str, Any]]: - """Map encoder name to the corresponding encoder class.""" + """Map encoder name to the corresponding encoder class. + Parameters + ---------- + encoder_name : str + The encoder name + Returns + ------- + Tuple[Union[Type[OrdinalEncoder], Type[TargetEncoder]], dict[str, Any]] + The encoder class and the encoder parameters + """ encoder = self.ENCODER_MAP.get(encoder_name) encoder_params = self.ENCODER_MAP_PARAMS.get(encoder_name) @@ -219,6 +289,19 @@ def _create_column_transformer( * For low cardinality features, use OrdinalEncoder. * For high cardinality features, use TargetEncoder. * For numeric features, pass them as is. + + Parameters + ---------- + high_cardinality_features : List[str] + High cardinality features + low_cardinality_features : List[str] + Low cardinality features + numeric_features : List[str] + Numeric features + Returns + ------- + ColumnTransformer + The ColumnTransformer """ transformers = [] @@ -270,7 +353,22 @@ def _transform_data( column_transformer: ColumnTransformer, is_train: bool = False, ) -> tuple[pd.DataFrame, Optional[pd.Series]]: - """Transform the data using the ColumnTransformer.""" + """Transform the data using the ColumnTransformer. + Parameters + ---------- + df : pd.DataFrame + The DataFrame to transform + target_column_name : str + The target column name + column_transformer : ColumnTransformer + The ColumnTransformer + is_train : bool, optional + Whether the data is for training, by default False + Returns + ------- + tuple[pd.DataFrame, Optional[pd.Series]] + The transformed data and the target column + """ if target_column_name in df.columns: X = df.drop(columns=[target_column_name]) # Drop the target column y = df[target_column_name] # Target column for training data @@ -292,14 +390,36 @@ def _transform_data( ) def _restore_column_order(self, df: pd.DataFrame, encoded_data: pd.DataFrame) -> pd.DataFrame: - """Restore the original column order.""" + """Restore the original column order. + Parameters + ---------- + df : pd.DataFrame + The original DataFrame + encoded_data : pd.DataFrame + The encoded DataFrame + Returns + ------- + pd.DataFrame + The encoded DataFrame with the original column order + """ new_column_order = [col for col in df.columns if col in encoded_data.columns] return encoded_data[new_column_order] def _convert_ordinal_encoded_columns_to_int( self, encoded_data: pd.DataFrame, encoded_feature_map: Dict[str, str] ) -> pd.DataFrame: - """Convert ordinal encoded columns to the smallest possible integer dtype.""" + """Convert ordinal encoded columns to the smallest possible integer dtype. + Parameters + ---------- + encoded_data : pd.DataFrame + The encoded DataFrame + encoded_feature_map : Dict[str, str] + The feature encoder map + Returns + ------- + pd.DataFrame + The encoded DataFrame with ordinal encoded columns converted to int + """ ordinal_encoded_features = [ col for col, encoder in encoded_feature_map.items() if encoder == "OrdinalEncoder" ] @@ -325,7 +445,18 @@ def _convert_ordinal_encoded_columns_to_int( def _restore_numeric_dtypes( self, encoded_data: pd.DataFrame, original_numeric_dtypes: dict ) -> pd.DataFrame: - """Restore original dtypes of numeric features.""" + """Restore original dtypes of numeric features. + Parameters + ---------- + encoded_data : pd.DataFrame + The encoded DataFrame + original_numeric_dtypes : dict + The original numeric dtypes + Returns + ------- + pd.DataFrame + The encoded DataFrame with the original numeric dtypes + """ for col, dtype in original_numeric_dtypes.items(): if col in encoded_data.columns: try: @@ -337,7 +468,16 @@ def _restore_numeric_dtypes( return encoded_data def _convert_float64_to_float32(self, encoded_data: pd.DataFrame) -> pd.DataFrame: - """Convert float64 columns to float32.""" + """Convert float64 columns to float32. + Parameters + ---------- + encoded_data : pd.DataFrame + The encoded DataFrame + Returns + ------- + pd.DataFrame + The encoded DataFrame with float64 columns converted to float32 + """ float64_columns = encoded_data.select_dtypes(include=["float64"]).columns for col in float64_columns: encoded_data[col] = encoded_data[col].astype(np.float32) @@ -346,6 +486,14 @@ def _convert_float64_to_float32(self, encoded_data: pd.DataFrame) -> pd.DataFram def _create_feature_encoder_map(self, column_transformer: ColumnTransformer) -> Dict[str, str]: """ Create a dictionary to store the encoder used for each feature. + Parameters + ---------- + column_transformer : ColumnTransformer + The ColumnTransformer + Returns + ------- + Dict[str, str] + The feature encoder map """ feature_encoder_map = {} transformed_features = column_transformer.get_feature_names_out() @@ -370,7 +518,20 @@ def _log_feature_info( high_cardinality_features: List[str], feature_encoder_map: Dict[str, str], ) -> None: - """Log information about the features.""" + """Log information about the features. + Parameters + ---------- + categorical_features : List[str] + Categorical features + numeric_features : List[str] + Numeric features + low_cardinality_features : List[str] + Low cardinality features + high_cardinality_features : List[str] + High cardinality features + feature_encoder_map : Dict[str, str] + The feature encoder map + """ self.logger.info( f"Categorical features: ({len(categorical_features)}) - {categorical_features}" ) diff --git a/pipeline_lib/core/steps/explainer_dashboard.py b/pipeline_lib/core/steps/explainer_dashboard.py index 1ba9596..3c5cc96 100644 --- a/pipeline_lib/core/steps/explainer_dashboard.py +++ b/pipeline_lib/core/steps/explainer_dashboard.py @@ -8,7 +8,7 @@ class ExplainerDashboardStep(PipelineStep): - """Scale the target using Quantile Transformer.""" + """Create an explainer dashboard for the model.""" used_for_prediction = False used_for_training = True @@ -19,12 +19,32 @@ def __init__( X_background_samples: int = 100, enable_step: bool = True, ) -> None: + """Initialize ExplainerDashboardStep. + Parameters + ---------- + max_samples : int, optional + Maximum number of samples to use for the explainer dashboard, by default 1000 + X_background_samples : int, optional + Number of samples to use for the background dataset, by default 100 + enable_step : bool, optional + Enable or disable the step, by default True + """ self.init_logger() self.max_samples = max_samples self.X_background_samples = X_background_samples self.enable_step = enable_step def execute(self, data: DataContainer) -> DataContainer: + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ if not self.enable_step: self.logger.info("ExplainerDashboardStep disabled, skipping execution") return data diff --git a/pipeline_lib/core/steps/fit_model.py b/pipeline_lib/core/steps/fit_model.py index 0579dc5..49fe8b3 100644 --- a/pipeline_lib/core/steps/fit_model.py +++ b/pipeline_lib/core/steps/fit_model.py @@ -15,7 +15,17 @@ class OptunaOptimizer: + """Optuna optimizer for hyperparameter tuning.""" + def __init__(self, optuna_params: dict, logger: logging.Logger) -> None: + """Initialize OptunaOptimizer. + Parameters + ---------- + optuna_params : dict + Dictionary containing the Optuna parameters + logger : logging.Logger + The logger object + """ self.optuna_params = optuna_params self.logger = logger @@ -28,6 +38,27 @@ def optimize( model_class: Type[Model], model_parameters: dict, ) -> dict: + """Optimize the model hyperparameters using Optuna. + Parameters + ---------- + X_train : pd.DataFrame + The training dataset + y_train : pd.Series + The training target + X_validation : pd.DataFrame + The validation dataset + y_validation : pd.Series + The validation target + model_class : Type[Model] + The model class to optimize + model_parameters : dict + The model parameters to optimize + Returns + ------- + dict + The best hyperparameters found by Optuna + """ + def objective(trial): # Create a copy of model_parameters, then update with the optuna hyperparameters param = {} @@ -49,6 +80,16 @@ def objective(trial): return best_params def _create_trial_params(self, trial) -> dict: + """Create a dictionary of hyperparameters for a single Optuna trial. + Parameters + ---------- + trial : optuna.Trial + The Optuna trial object + Returns + ------- + dict + The hyperparameters for the trial + """ param = {} for key, value in self.optuna_params.get("search_space", {}).items(): if isinstance(value, dict): @@ -60,6 +101,12 @@ def _create_trial_params(self, trial) -> dict: return param def _create_study(self) -> optuna.Study: + """Create an Optuna study. + Returns + ------- + optuna.Study + The Optuna study object + """ study_name = self.optuna_params.get("study_name") storage = self.optuna_params.get("storage", "sqlite:///db.sqlite3") load_if_exists = self.optuna_params.get("load_if_exists", False) @@ -78,6 +125,20 @@ def _create_study(self) -> optuna.Study: @staticmethod def _calculate_error(y_true, y_pred, metric): + """Calculate the error between the true and predicted values. + Parameters + ---------- + y_true : np.ndarray + The true target values + y_pred : np.ndarray + The predicted target values + metric : str + The error metric to calculate + Returns + ------- + float + The error value + """ metrics = { "mae": mean_absolute_error, "mse": mean_squared_error, @@ -91,6 +152,8 @@ def _calculate_error(y_true, y_pred, metric): class ModelStep(PipelineStep): + """Fit and predict with a model.""" + used_for_training = True used_for_prediction = True @@ -100,6 +163,16 @@ def __init__( model_parameters: Optional[dict] = None, optuna_params: Optional[dict] = None, ) -> None: + """Initialize ModelStep. + Parameters + ---------- + model_class : Type[Model] + The model class to use + model_parameters : dict, optional + The model parameters, by default None + optuna_params : dict, optional + The Optuna parameters for hyperparameter tuning, by default None + """ super().__init__() self.init_logger() self.model_class = model_class @@ -107,12 +180,32 @@ def __init__( self.optuna_params = optuna_params def execute(self, data: DataContainer) -> DataContainer: + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ if data.is_train: return self.train(data) return self.predict(data) def train(self, data: DataContainer) -> DataContainer: + """Train the model. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ self.logger.info(f"Fitting the {self.model_class.__name__} model") model_parameters = self.model_parameters @@ -151,7 +244,12 @@ def train(self, data: DataContainer) -> DataContainer: return data def _save_datasets_predictions(self, data: DataContainer) -> None: - """Save the predictions for each dataset (train, val, test) in the DataContainer.""" + """Save the predictions for each dataset (train, val, test) in the DataContainer. + Parameters + ---------- + data : DataContainer + The data container + """ for dataset_name in ["train", "validation", "test"]: dataset = getattr(data, dataset_name, None) encoded_dataset = getattr(data, f"X_{dataset_name}", None) @@ -165,6 +263,16 @@ def _save_datasets_predictions(self, data: DataContainer) -> None: dataset[data.prediction_column] = data.model.predict(encoded_dataset) def predict(self, data: DataContainer) -> DataContainer: + """Predict with the model. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ self.logger.info(f"Predicting with {self.model_class.__name__} model") data.flow[data.prediction_column] = data.model.predict(data.X_prediction) data.predictions = data.flow[data.prediction_column] diff --git a/pipeline_lib/core/steps/generate.py b/pipeline_lib/core/steps/generate.py index 5d58901..94ec79d 100644 --- a/pipeline_lib/core/steps/generate.py +++ b/pipeline_lib/core/steps/generate.py @@ -16,6 +16,8 @@ class FileType(Enum): class GenerateStep(PipelineStep): + """Generate data from a file.""" + used_for_prediction = True used_for_training = True @@ -39,7 +41,16 @@ def __init__( self.optimize_dtypes_skip_cols = optimize_dtypes_skip_cols or [] def execute(self, data: DataContainer) -> DataContainer: - """Generate the data from the file.""" + """Execute the step. + Parameters + ---------- + data : DataContainer + The data container + Returns + ------- + DataContainer + The updated data container + """ # Skip GenerateStep if the data is already loaded if not data.is_train and data.raw is not None: @@ -153,6 +164,16 @@ def execute(self, data: DataContainer) -> DataContainer: return data def _infer_file_type(self, file_path: str) -> FileType: + """Infer the file type based on the file extension. + Parameters + ---------- + file_path : str + The file path + Returns + ------- + FileType + The file type + """ _, file_extension = os.path.splitext(file_path) file_extension = file_extension.lower() @@ -162,6 +183,18 @@ def _infer_file_type(self, file_path: str) -> FileType: raise ValueError(f"Unsupported file extension: {file_extension}") def _read_csv(self, file_path: str, **kwargs) -> pd.DataFrame: + """Read a CSV file. + Parameters + ---------- + file_path : str + The file path + **kwargs + Additional keyword arguments to pass to pd.read_csv + Returns + ------- + pd.DataFrame + The DataFrame + """ index_col = kwargs.pop("index", None) self.logger.info(f"Reading CSV file with kwargs: {kwargs}") df = pd.read_csv(file_path, **kwargs) @@ -170,6 +203,18 @@ def _read_csv(self, file_path: str, **kwargs) -> pd.DataFrame: return df def _read_parquet(self, file_path: str, **kwargs) -> pd.DataFrame: + """Read a parquet file. + Parameters + ---------- + file_path : str + The file path + **kwargs + Additional keyword arguments to pass to pd.read_parquet + Returns + ------- + pd.DataFrame + The DataFrame + """ index_col = kwargs.pop("index", None) self.logger.info(f"Reading parquet file with kwargs: {kwargs}") df = pd.read_parquet(file_path, **kwargs) @@ -178,6 +223,16 @@ def _read_parquet(self, file_path: str, **kwargs) -> pd.DataFrame: return df def _load_data_from_file(self, file_path: str) -> pd.DataFrame: + """Load data from a file. + Parameters + ---------- + file_path : str + The file path + Returns + ------- + pd.DataFrame + The DataFrame + """ file_type = self._infer_file_type(file_path) if file_type == FileType.CSV: