diff --git a/.github/workflows/pypi-release-abc-pipeline.yml b/.github/workflows/pypi-release-abc-pipeline.yml new file mode 100644 index 000000000..3023a0cf1 --- /dev/null +++ b/.github/workflows/pypi-release-abc-pipeline.yml @@ -0,0 +1,44 @@ +name: Build aboutcode-pipeline Python distributions and publish on PyPI + +on: + workflow_dispatch: + push: + tags: + - "pipeline-v*.*.*" + +jobs: + build-and-publish: + name: Build and publish library to PyPI + runs-on: ubuntu-22.04 + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: 3.12 + + - name: Install flot + run: python -m pip install flot --user + + - name: Build a binary wheel and a source tarball + run: python -m flot --pyproject pyproject-pipeline.toml --sdist --wheel --output-dir dist/ + + - name: Publish to PyPI + if: startsWith(github.ref, 'refs/tags') + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_API_TOKEN }} + + - name: Upload built archives + uses: actions/upload-artifact@v4 + with: + name: pypi_archives + path: dist/* + + - name: Create a GitHub release + uses: softprops/action-gh-release@v1 + with: + draft: false + files: dist/* diff --git a/pipeline/__init__.py b/pipeline/__init__.py new file mode 100644 index 000000000..be398dbce --- /dev/null +++ b/pipeline/__init__.py @@ -0,0 +1,330 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/nexB/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/nexB/scancode.io for support and download. + +import logging +import traceback +from pydoc import getdoc +from pydoc import splitdoc +from timeit import default_timer as timer + +from django.utils import timezone + +import bleach +from markdown_it import MarkdownIt + +logger = logging.getLogger(__name__) + + +""" +Pipeline: steps definition, documentation +Run: context (groups, steps), execution, logging, and results + +from pipeline import BasePipeline +from pipeline import BasePipelineRun + +class DoSomething(BasePipeline, BasePipelineRun): + @classmethod + def steps(cls): + return (cls.step1,) + def step1(self): + print("Message from step1") + +# 1. Run pipeline +run = DoSomething() +run.execute() + +# 2. Run pipeline with selected groups +run = BasePipelineRun(selected_groups=["group1", "group2"]) +run.execute() +""" + + +def group(*groups): + """Mark a function as part of a particular group.""" + + def decorator(obj): + if hasattr(obj, "groups"): + obj.groups = obj.groups.union(groups) + else: + setattr(obj, "groups", set(groups)) + return obj + + return decorator + + +def convert_markdown_to_html(markdown_text): + """Convert Markdown text to sanitized HTML.""" + # Using the "js-default" for safety. + html_content = MarkdownIt("js-default").renderInline(markdown_text) + # Sanitize HTML using bleach. + sanitized_html = bleach.clean(html_content) + return sanitized_html + + +def humanize_time(seconds): + """Convert the provided ``seconds`` number into human-readable time.""" + message = f"{seconds:.0f} seconds" + + if seconds > 86400: + message += f" ({seconds / 86400:.1f} days)" + if seconds > 3600: + message += f" ({seconds / 3600:.1f} hours)" + elif seconds > 60: + message += f" ({seconds / 60:.1f} minutes)" + + return message + + +class LoopProgress: + """ + A context manager for logging progress in loops. + + Usage:: + + total_iterations = 100 + logger = print # Replace with your actual logger function + + progress = LoopProgress(total_iterations, logger, progress_step=10) + for item in progress.iter(iterator): + "Your processing logic here" + + with LoopProgress(total_iterations, logger, progress_step=10) as progress: + for item in progress.iter(iterator): + "Your processing logic here" + """ + + def __init__(self, total_iterations, logger, progress_step=10): + self.total_iterations = total_iterations + self.logger = logger + self.progress_step = progress_step + self.start_time = timer() + self.last_logged_progress = 0 + self.current_iteration = 0 + + def get_eta(self, current_progress): + run_time = timer() - self.start_time + return round(run_time / current_progress * (100 - current_progress)) + + @property + def current_progress(self): + return int((self.current_iteration / self.total_iterations) * 100) + + @property + def eta(self): + run_time = timer() - self.start_time + return round(run_time / self.current_progress * (100 - self.current_progress)) + + def log_progress(self): + reasons_to_skip = [ + not self.logger, + not self.current_iteration > 0, + self.total_iterations <= self.progress_step, + ] + if any(reasons_to_skip): + return + + if self.current_progress >= self.last_logged_progress + self.progress_step: + msg = ( + f"Progress: {self.current_progress}% " + f"({self.current_iteration}/{self.total_iterations})" + ) + if eta := self.eta: + msg += f" ETA: {humanize_time(eta)}" + + self.logger(msg) + self.last_logged_progress = self.current_progress + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + def iter(self, iterator): + for item in iterator: + self.current_iteration += 1 + self.log_progress() + yield item + + +class BasePipelineRun: + """Base class for all pipeline run (execution).""" + + def __init__(self, selected_groups=None, selected_steps=None): + """Load the Pipeline class.""" + self.pipeline_class = self.__class__ + self.pipeline_name = self.pipeline_class.__name__ + + self.selected_groups = selected_groups + self.selected_steps = selected_steps or [] + + self.execution_log = [] + self.current_step = "" + + def append_to_log(self, message): + self.execution_log.append(message) + + def set_current_step(self, message): + self.current_step = message + + def log(self, message): + """Log the given `message` to the current module logger and Run instance.""" + now_as_localtime = timezone.localtime(timezone.now()) + timestamp = now_as_localtime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-4] + message = f"{timestamp} {message}" + logger.info(message) + self.append_to_log(message) + + @staticmethod + def output_from_exception(exception): + """Return a formatted error message including the traceback.""" + output = f"{exception}\n\n" + + if exception.__cause__ and str(exception.__cause__) != str(exception): + output += f"Cause: {exception.__cause__}\n\n" + + traceback_formatted = "".join(traceback.format_tb(exception.__traceback__)) + output += f"Traceback:\n{traceback_formatted}" + + return output + + def execute(self): + """Execute each steps in the order defined on this pipeline class.""" + self.log(f"Pipeline [{self.pipeline_name}] starting") + + steps = self.pipeline_class.get_steps(groups=self.selected_groups) + selected_steps = self.selected_steps + + steps_count = len(steps) + pipeline_start_time = timer() + + for current_index, step in enumerate(steps, start=1): + step_name = step.__name__ + + if selected_steps and step_name not in selected_steps: + self.log(f"Step [{step_name}] skipped") + continue + + self.set_current_step(f"{current_index}/{steps_count} {step_name}") + self.log(f"Step [{step_name}] starting") + step_start_time = timer() + + try: + step(self) + except Exception as exception: + self.log("Pipeline failed") + return 1, self.output_from_exception(exception) + + step_run_time = timer() - step_start_time + self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}") + + self.set_current_step("") # Reset the `current_step` field on completion + pipeline_run_time = timer() - pipeline_start_time + self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}") + + return 0, "" + + +class BasePipeline: + """Base class for all pipeline implementations.""" + + # Flag indicating if the Pipeline is an add-on, meaning it cannot be run first. + is_addon = False + + @classmethod + def steps(cls): + raise NotImplementedError + + @classmethod + def get_steps(cls, groups=None): + """ + Return the list of steps defined in the ``steps`` class method. + + If the optional ``groups`` parameter is provided, only include steps labeled + with groups that intersect with the provided list. If a step has no groups or + if ``groups`` is not specified, include the step in the result. + """ + if not callable(cls.steps): + raise TypeError("Use a ``steps(cls)`` classmethod to declare the steps.") + + steps = cls.steps() + + if groups is not None: + steps = tuple( + step + for step in steps + if not getattr(step, "groups", []) + or set(getattr(step, "groups")).intersection(groups) + ) + + return steps + + @classmethod + def get_doc(cls): + """Get the doc string of this pipeline.""" + return getdoc(cls) + + @classmethod + def get_graph(cls): + """Return a graph of steps.""" + return [ + { + "name": step.__name__, + "doc": getdoc(step), + "groups": getattr(step, "groups", []), + } + for step in cls.get_steps() + ] + + @classmethod + def get_info(cls, as_html=False): + """Get a dictionary of combined information data about this pipeline.""" + summary, description = splitdoc(cls.get_doc()) + steps = cls.get_graph() + + if as_html: + summary = convert_markdown_to_html(summary) + description = convert_markdown_to_html(description) + for step in steps: + step["doc"] = convert_markdown_to_html(step["doc"]) + + return { + "summary": summary, + "description": description, + "steps": steps, + "available_groups": cls.get_available_groups(), + } + + @classmethod + def get_summary(cls): + """Get the doc string summary.""" + return cls.get_info()["summary"] + + @classmethod + def get_available_groups(cls): + return sorted( + set( + group_name + for step in cls.get_steps() + for group_name in getattr(step, "groups", []) + ) + ) diff --git a/pyproject-pipeline.toml b/pyproject-pipeline.toml new file mode 100644 index 000000000..b494b0f29 --- /dev/null +++ b/pyproject-pipeline.toml @@ -0,0 +1,59 @@ +[build-system] +requires = [ "flot>=0.7.0" ] +build-backend = "flot.buildapi" + +[project] +name = "aboutcode-pipeline" +version = "1.0.0" +description = "" +license = { text = "Apache-2.0" } +# TODO consider a readme? +# readme = "README.rst" +requires-python = ">=3.8" +authors = [ + { name = "nexB. Inc. and others", email = "info@aboutcode.org" }, +] +keywords = [ + "open source", + "pipeline", + "scancode", +] + +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development", + "Topic :: Utilities", +] + +dependencies = [ + "bleach>=6.1.0", + "markdown-it-py>=3.0.0", +] + + +[project.urls] +Homepage = "https://github.com/nexB/scancode.io" + + +[tool.flot] + +includes = [ + "pipeline/__init__.py", +] + +metadata_files = [ + "*.LICENSE", + "NOTICE", + "CHANGELOG.rst", + "CODE_OF_CONDUCT.rst", + "README-pipeline.rst", +] + diff --git a/scanpipe/forms.py b/scanpipe/forms.py index 6ed55f6b1..66e2a865b 100644 --- a/scanpipe/forms.py +++ b/scanpipe/forms.py @@ -28,9 +28,9 @@ from taggit.forms import TagField from taggit.forms import TagWidget +from pipeline import convert_markdown_to_html from scanpipe.models import Project from scanpipe.models import Run -from scanpipe.pipelines import convert_markdown_to_html from scanpipe.pipes import fetch scanpipe_app = apps.get_app_config("scanpipe") diff --git a/scanpipe/pipelines/__init__.py b/scanpipe/pipelines/__init__.py index 44b20cf8b..61ca984aa 100644 --- a/scanpipe/pipelines/__init__.py +++ b/scanpipe/pipelines/__init__.py @@ -26,17 +26,11 @@ from contextlib import contextmanager from functools import wraps from pathlib import Path -from pydoc import getdoc -from pydoc import splitdoc -from timeit import default_timer as timer -from django.utils import timezone - -import bleach -from markdown_it import MarkdownIt from pyinstrument import Profiler -from scanpipe import humanize_time +from pipeline import BasePipeline +from pipeline import BasePipelineRun logger = logging.getLogger(__name__) @@ -55,187 +49,112 @@ def _generate_message(self): return message -def group(*groups): - """Mark a function as part of a particular group.""" - - def decorator(obj): - if hasattr(obj, "groups"): - obj.groups = obj.groups.union(groups) - else: - setattr(obj, "groups", set(groups)) - return obj - - return decorator - - -def convert_markdown_to_html(markdown_text): - """Convert Markdown text to sanitized HTML.""" - # Using the "js-default" for safety. - html_content = MarkdownIt("js-default").renderInline(markdown_text) - # Sanitize HTML using bleach. - sanitized_html = bleach.clean(html_content) - return sanitized_html +class CommonStepsMixin: + """Common steps available on all project pipelines.""" + def flag_empty_files(self): + """Flag empty files.""" + from scanpipe.pipes import flag -class BasePipeline: - """Base class for all pipelines.""" + flag.flag_empty_files(self.project) - # Flag specifying whether to download missing inputs as an initial step. - download_inputs = True - # Flag indicating if the Pipeline is an add-on, meaning it cannot be run first. - is_addon = False - # Optional URL that targets a view of the results relative to this Pipeline. - # This URL may contain dictionary-style string formatting, which will be - # interpolated against the project's field attributes. - # For example, you could use results_url="/project/{slug}/packages/?filter=value" - # to target the Package list view with an active filtering. - results_url = "" + def flag_ignored_resources(self): + """Flag ignored resources based on Project ``ignored_patterns`` setting.""" + from scanpipe.pipes import flag - def __init__(self, run): - """Load the Run and Project instances.""" - self.run = run - self.project = run.project - self.pipeline_name = run.pipeline_name - self.env = self.project.get_env() + if ignored_patterns := self.env.get("ignored_patterns"): + flag.flag_ignored_patterns(self.project, patterns=ignored_patterns) - @classmethod - def steps(cls): - raise NotImplementedError + def extract_archive(self, location, target): + """Extract archive at `location` to `target`. Save errors as messages.""" + from scanpipe.pipes import scancode - @classmethod - def get_steps(cls, groups=None): - """ - Return the list of steps defined in the ``steps`` class method. + extract_errors = scancode.extract_archive(location, target) - If the optional ``groups`` parameter is provided, only include steps labeled - with groups that intersect with the provided list. If a step has no groups or - if ``groups`` is not specified, include the step in the result. - """ - if not callable(cls.steps): - raise TypeError("Use a ``steps(cls)`` classmethod to declare the steps.") + for resource_location, errors in extract_errors.items(): + resource_path = Path(resource_location) - steps = cls.steps() + if resource_path.is_relative_to(self.project.codebase_path): + resource_path = resource_path.relative_to(self.project.codebase_path) + details = {"resource_path": str(resource_path)} + elif resource_path.is_relative_to(self.project.input_path): + resource_path = resource_path.relative_to(self.project.input_path) + details = {"path": f"input/{str(resource_path)}"} + else: + details = {"filename": str(resource_path.name)} - if groups is not None: - steps = tuple( - step - for step in steps - if not getattr(step, "groups", []) - or set(getattr(step, "groups")).intersection(groups) + self.project.add_error( + description="\n".join(errors), + model="extract_archive", + details=details, ) - return steps - - @classmethod - def get_doc(cls): - """Get the doc string of this pipeline.""" - return getdoc(cls) - - @classmethod - def get_graph(cls): - """Return a graph of steps.""" - return [ - { - "name": step.__name__, - "doc": getdoc(step), - "groups": getattr(step, "groups", []), - } - for step in cls.get_steps() - ] + def extract_archives(self, location=None): + """Extract archives located in the codebase/ directory with extractcode.""" + from scanpipe.pipes import scancode - @classmethod - def get_info(cls, as_html=False): - """Get a dictionary of combined information data about this pipeline.""" - summary, description = splitdoc(cls.get_doc()) - steps = cls.get_graph() - - if as_html: - summary = convert_markdown_to_html(summary) - description = convert_markdown_to_html(description) - for step in steps: - step["doc"] = convert_markdown_to_html(step["doc"]) - - return { - "summary": summary, - "description": description, - "steps": steps, - "available_groups": cls.get_available_groups(), - } + if not location: + location = self.project.codebase_path - @classmethod - def get_summary(cls): - """Get the doc string summary.""" - return cls.get_info()["summary"] + extract_errors = scancode.extract_archives(location=location, recurse=True) - @classmethod - def get_available_groups(cls): - return sorted( - set( - group_name - for step in cls.get_steps() - for group_name in getattr(step, "groups", []) + for resource_path, errors in extract_errors.items(): + self.project.add_error( + description="\n".join(errors), + model="extract_archives", + details={"resource_path": resource_path}, ) - ) - def log(self, message): - """Log the given `message` to the current module logger and Run instance.""" - now_as_localtime = timezone.localtime(timezone.now()) - timestamp = now_as_localtime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-4] - message = f"{timestamp} {message}" - logger.info(message) - self.run.append_to_log(message) + # Reload the project env post-extraction as the scancode-config.yml file + # may be located in one of the extracted archives. + self.env = self.project.get_env() - @staticmethod - def output_from_exception(exception): - """Return a formatted error message including the traceback.""" - output = f"{exception}\n\n" - if exception.__cause__ and str(exception.__cause__) != str(exception): - output += f"Cause: {exception.__cause__}\n\n" +class ProjectPipelineRun(BasePipelineRun): + def __init__(self, run_instance): + """Load the Pipeline execution context from a Run database object.""" + self.run = run_instance + self.project = run_instance.project + self.env = self.project.get_env() - traceback_formatted = "".join(traceback.format_tb(exception.__traceback__)) - output += f"Traceback:\n{traceback_formatted}" + self.pipeline_class = run_instance.pipeline_class + self.pipeline_name = run_instance.pipeline_name - return output + self.selected_groups = run_instance.selected_groups + self.selected_steps = run_instance.selected_steps - def execute(self): - """Execute each steps in the order defined on this pipeline class.""" - self.log(f"Pipeline [{self.pipeline_name}] starting") + def append_to_log(self, message): + self.run.append_to_log(message) - steps = self.get_steps(groups=self.run.selected_groups) - selected_steps = self.run.selected_steps + def set_current_step(self, message): + self.run.set_current_step(message) - if self.download_inputs: - steps = (self.__class__.download_missing_inputs,) + steps - steps_count = len(steps) - pipeline_start_time = timer() +class Pipeline(CommonStepsMixin, BasePipeline, ProjectPipelineRun): + """Main class for all project related pipelines including common steps methods.""" - for current_index, step in enumerate(steps, start=1): - step_name = step.__name__ + # Project wrapper ProjectPipelineRun class + # run_class = ProjectPipelineRun - if selected_steps and step_name not in selected_steps: - self.log(f"Step [{step_name}] skipped") - continue - - self.run.set_current_step(f"{current_index}/{steps_count} {step_name}") - self.log(f"Step [{step_name}] starting") - step_start_time = timer() + # Flag specifying whether to download missing inputs as an initial step. + download_inputs = True - try: - step(self) - except Exception as exception: - self.log("Pipeline failed") - return 1, self.output_from_exception(exception) + # Optional URL that targets a view of the results relative to this Pipeline. + # This URL may contain dictionary-style string formatting, which will be + # interpolated against the project's field attributes. + # For example, you could use results_url="/project/{slug}/packages/?filter=value" + # to target the Package list view with an active filtering. + results_url = "" - step_run_time = timer() - step_start_time - self.log(f"Step [{step_name}] completed in {humanize_time(step_run_time)}") + @classmethod + def get_steps(cls, groups=None): + """Inject the ``download_inputs`` step if enabled.""" + steps = super().get_steps(groups) - self.run.set_current_step("") # Reset the `current_step` field on completion - pipeline_run_time = timer() - pipeline_start_time - self.log(f"Pipeline completed in {humanize_time(pipeline_run_time)}") + if cls.download_inputs: + steps = (cls.download_missing_inputs,) + steps - return 0, "" + return steps def download_missing_inputs(self): """ @@ -297,67 +216,6 @@ def save_errors(self, *exceptions, **kwargs): self.add_error(exception=error, **kwargs) -class Pipeline(BasePipeline): - """Main class for all pipelines including common step methods.""" - - def flag_empty_files(self): - """Flag empty files.""" - from scanpipe.pipes import flag - - flag.flag_empty_files(self.project) - - def flag_ignored_resources(self): - """Flag ignored resources based on Project ``ignored_patterns`` setting.""" - from scanpipe.pipes import flag - - if ignored_patterns := self.env.get("ignored_patterns"): - flag.flag_ignored_patterns(self.project, patterns=ignored_patterns) - - def extract_archive(self, location, target): - """Extract archive at `location` to `target`. Save errors as messages.""" - from scanpipe.pipes import scancode - - extract_errors = scancode.extract_archive(location, target) - - for resource_location, errors in extract_errors.items(): - resource_path = Path(resource_location) - - if resource_path.is_relative_to(self.project.codebase_path): - resource_path = resource_path.relative_to(self.project.codebase_path) - details = {"resource_path": str(resource_path)} - elif resource_path.is_relative_to(self.project.input_path): - resource_path = resource_path.relative_to(self.project.input_path) - details = {"path": f"input/{str(resource_path)}"} - else: - details = {"filename": str(resource_path.name)} - - self.project.add_error( - description="\n".join(errors), - model="extract_archive", - details=details, - ) - - def extract_archives(self, location=None): - """Extract archives located in the codebase/ directory with extractcode.""" - from scanpipe.pipes import scancode - - if not location: - location = self.project.codebase_path - - extract_errors = scancode.extract_archives(location=location, recurse=True) - - for resource_path, errors in extract_errors.items(): - self.project.add_error( - description="\n".join(errors), - model="extract_archives", - details={"resource_path": resource_path}, - ) - - # Reload the project env post-extraction as the scancode-config.yml file - # may be located in one of the extracted archives. - self.env = self.project.get_env() - - def is_pipeline(obj): """ Return True if the `obj` is a subclass of `Pipeline` except for the diff --git a/scanpipe/pipelines/deploy_to_develop.py b/scanpipe/pipelines/deploy_to_develop.py index db39fea3a..e5d9b4a03 100644 --- a/scanpipe/pipelines/deploy_to_develop.py +++ b/scanpipe/pipelines/deploy_to_develop.py @@ -20,9 +20,9 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. +from pipeline import group from scanpipe import pipes from scanpipe.pipelines import Pipeline -from scanpipe.pipelines import group from scanpipe.pipes import d2d from scanpipe.pipes import flag from scanpipe.pipes import matchcode diff --git a/scanpipe/pipelines/inspect_packages.py b/scanpipe/pipelines/inspect_packages.py index e89d9fdea..12306ea8d 100644 --- a/scanpipe/pipelines/inspect_packages.py +++ b/scanpipe/pipelines/inspect_packages.py @@ -20,7 +20,7 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. -from scanpipe.pipelines import group +from pipeline import group from scanpipe.pipelines.scan_codebase import ScanCodebase from scanpipe.pipes import scancode diff --git a/scanpipe/pipelines/resolve_dependencies.py b/scanpipe/pipelines/resolve_dependencies.py index 706558771..9a1132c24 100644 --- a/scanpipe/pipelines/resolve_dependencies.py +++ b/scanpipe/pipelines/resolve_dependencies.py @@ -20,7 +20,7 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. -from scanpipe.pipelines import group +from pipeline import group from scanpipe.pipelines.scan_codebase import ScanCodebase from scanpipe.pipes import resolve from scanpipe.pipes import scancode diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index 1f52270df..7dd4b1b9c 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -29,11 +29,9 @@ from datetime import datetime from itertools import islice from pathlib import Path -from timeit import default_timer as timer from django.db.models import Count -from scanpipe import humanize_time from scanpipe.models import AbstractTaskFieldsModel from scanpipe.models import CodebaseRelation from scanpipe.models import CodebaseResource @@ -402,78 +400,6 @@ def get_bin_executable(filename): return str(Path(sys.executable).parent / filename) -class LoopProgress: - """ - A context manager for logging progress in loops. - - Usage:: - - total_iterations = 100 - logger = print # Replace with your actual logger function - - progress = LoopProgress(total_iterations, logger, progress_step=10) - for item in progress.iter(iterator): - "Your processing logic here" - - with LoopProgress(total_iterations, logger, progress_step=10) as progress: - for item in progress.iter(iterator): - "Your processing logic here" - """ - - def __init__(self, total_iterations, logger, progress_step=10): - self.total_iterations = total_iterations - self.logger = logger - self.progress_step = progress_step - self.start_time = timer() - self.last_logged_progress = 0 - self.current_iteration = 0 - - def get_eta(self, current_progress): - run_time = timer() - self.start_time - return round(run_time / current_progress * (100 - current_progress)) - - @property - def current_progress(self): - return int((self.current_iteration / self.total_iterations) * 100) - - @property - def eta(self): - run_time = timer() - self.start_time - return round(run_time / self.current_progress * (100 - self.current_progress)) - - def log_progress(self): - reasons_to_skip = [ - not self.logger, - not self.current_iteration > 0, - self.total_iterations <= self.progress_step, - ] - if any(reasons_to_skip): - return - - if self.current_progress >= self.last_logged_progress + self.progress_step: - msg = ( - f"Progress: {self.current_progress}% " - f"({self.current_iteration}/{self.total_iterations})" - ) - if eta := self.eta: - msg += f" ETA: {humanize_time(eta)}" - - self.logger(msg) - self.last_logged_progress = self.current_progress - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - def iter(self, iterator): - for item in iterator: - self.current_iteration += 1 - self.log_progress() - yield item - - def get_text_str_diff_ratio(str_a, str_b): """ Return a similarity ratio as a float between 0 and 1 by comparing the diff --git a/scanpipe/pipes/d2d.py b/scanpipe/pipes/d2d.py index e65690997..e2792e2b8 100644 --- a/scanpipe/pipes/d2d.py +++ b/scanpipe/pipes/d2d.py @@ -44,11 +44,11 @@ from packagedcode.npm import NpmPackageJsonHandler from summarycode.classify import LEGAL_STARTS_ENDS +from pipeline import LoopProgress from scanpipe import pipes from scanpipe.models import CodebaseRelation from scanpipe.models import CodebaseResource from scanpipe.models import convert_glob_to_django_regex -from scanpipe.pipes import LoopProgress from scanpipe.pipes import flag from scanpipe.pipes import get_resource_diff_ratio from scanpipe.pipes import js diff --git a/scanpipe/pipes/purldb.py b/scanpipe/pipes/purldb.py index 5ed1962ff..677a9be65 100644 --- a/scanpipe/pipes/purldb.py +++ b/scanpipe/pipes/purldb.py @@ -32,7 +32,7 @@ from univers.version_range import RANGE_CLASS_BY_SCHEMES from univers.version_range import InvalidVersionRange -from scanpipe.pipes import LoopProgress +from pipeline import LoopProgress from scanpipe.pipes import _clean_package_data from scanpipe.pipes import poll_until_success diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index 9cb708284..d9e93b6f3 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -45,6 +45,7 @@ from scancode import cli as scancode_cli from scancode.cli import run_scan as scancode_run_scan +from pipeline import LoopProgress from scanpipe import pipes from scanpipe.models import CodebaseResource from scanpipe.models import DiscoveredDependency @@ -308,7 +309,7 @@ def scan_resources( resource_count = resource_qs.count() logger.info(f"Scan {resource_count} codebase resources with {scan_func.__name__}") resource_iterator = resource_qs.iterator(chunk_size=2000) - progress = pipes.LoopProgress(resource_count, logger=progress_logger) + progress = LoopProgress(resource_count, logger=progress_logger) max_workers = get_max_workers(keep_available=1) if max_workers <= 0: diff --git a/scanpipe/pipes/strings.py b/scanpipe/pipes/strings.py index 2d58c616e..7da0cd762 100644 --- a/scanpipe/pipes/strings.py +++ b/scanpipe/pipes/strings.py @@ -20,7 +20,7 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. -from scanpipe.pipes import LoopProgress +from pipeline import LoopProgress class XgettextNotFound(Exception): diff --git a/scanpipe/pipes/symbols.py b/scanpipe/pipes/symbols.py index 4ef2724f6..32970eca0 100644 --- a/scanpipe/pipes/symbols.py +++ b/scanpipe/pipes/symbols.py @@ -22,7 +22,7 @@ from django.db.models import Q -from scanpipe.pipes import LoopProgress +from pipeline import LoopProgress class UniversalCtagsNotFound(Exception): diff --git a/scanpipe/tests/pipelines/do_nothing.py b/scanpipe/tests/pipelines/do_nothing.py index 01d8c8f91..91ed203ce 100644 --- a/scanpipe/tests/pipelines/do_nothing.py +++ b/scanpipe/tests/pipelines/do_nothing.py @@ -30,6 +30,8 @@ class DoNothing(Pipeline): Description section of the doc string. """ + download_inputs = False + @classmethod def steps(cls): return ( diff --git a/scanpipe/tests/pipelines/profile_step.py b/scanpipe/tests/pipelines/profile_step.py index 42135677b..06022d1e1 100644 --- a/scanpipe/tests/pipelines/profile_step.py +++ b/scanpipe/tests/pipelines/profile_step.py @@ -27,6 +27,8 @@ class ProfileStep(Pipeline): """Profile a step using the @profile decorator.""" + download_inputs = False + @classmethod def steps(cls): return (cls.step,) diff --git a/scanpipe/tests/pipelines/raise_exception.py b/scanpipe/tests/pipelines/raise_exception.py index b9a71c656..75cdd425c 100644 --- a/scanpipe/tests/pipelines/raise_exception.py +++ b/scanpipe/tests/pipelines/raise_exception.py @@ -26,6 +26,8 @@ class RaiseException(Pipeline): """Raise an Exception.""" + download_inputs = False + @classmethod def steps(cls): return (cls.raise_exception_step,) diff --git a/scanpipe/tests/pipelines/register_from_file.py b/scanpipe/tests/pipelines/register_from_file.py index c07914fd3..6dd84420e 100644 --- a/scanpipe/tests/pipelines/register_from_file.py +++ b/scanpipe/tests/pipelines/register_from_file.py @@ -26,6 +26,8 @@ class RegisterFromFile(DoNothing): """Register from its file path.""" + download_inputs = False + @classmethod def steps(cls): return (cls.step1,) diff --git a/scanpipe/tests/pipelines/steps_as_attribute.py b/scanpipe/tests/pipelines/steps_as_attribute.py index b9853dce2..2755c5e58 100644 --- a/scanpipe/tests/pipelines/steps_as_attribute.py +++ b/scanpipe/tests/pipelines/steps_as_attribute.py @@ -26,6 +26,8 @@ class StepsAsAttribute(Pipeline): """Declare steps as attribute.""" + download_inputs = False + def step1(self): return diff --git a/scanpipe/tests/pipelines/with_groups.py b/scanpipe/tests/pipelines/with_groups.py index 1b00e3f16..32b4f471d 100644 --- a/scanpipe/tests/pipelines/with_groups.py +++ b/scanpipe/tests/pipelines/with_groups.py @@ -20,13 +20,15 @@ # ScanCode.io is a free software code scanning tool from nexB Inc. and others. # Visit https://github.com/nexB/scancode.io for support and download. +from pipeline import group from scanpipe.pipelines import Pipeline -from scanpipe.pipelines import group class WithGroups(Pipeline): """Include "grouped" steps.""" + download_inputs = False + @classmethod def steps(cls): return ( diff --git a/scanpipe/tests/pipes/test_pipes.py b/scanpipe/tests/pipes/test_pipes.py index 5af6d4518..d59a1dc7f 100644 --- a/scanpipe/tests/pipes/test_pipes.py +++ b/scanpipe/tests/pipes/test_pipes.py @@ -28,6 +28,7 @@ from django.test import TestCase from django.test import TransactionTestCase +from pipeline import LoopProgress from scanpipe import pipes from scanpipe.models import CodebaseResource from scanpipe.models import DiscoveredPackage @@ -348,14 +349,14 @@ def test_scanpipe_loop_progress_as_context_manager(self): buffer = io.StringIO() logger = buffer.write - progress = pipes.LoopProgress(total_iterations, logger, progress_step=10) + progress = LoopProgress(total_iterations, logger, progress_step=10) for _ in progress.iter(range(total_iterations)): pass self.assertEqual(expected, buffer.getvalue()) buffer = io.StringIO() logger = buffer.write - with pipes.LoopProgress(total_iterations, logger, progress_step) as progress: + with LoopProgress(total_iterations, logger, progress_step) as progress: for _ in progress.iter(range(total_iterations)): pass self.assertEqual(expected, buffer.getvalue()) diff --git a/scanpipe/tests/test_pipelines.py b/scanpipe/tests/test_pipelines.py index aef621ece..5fba6104a 100644 --- a/scanpipe/tests/test_pipelines.py +++ b/scanpipe/tests/test_pipelines.py @@ -152,10 +152,10 @@ def test_scanpipe_pipeline_class_execute_with_selected_steps(self, step2, step1) project1 = Project.objects.create(name="Analysis") run = project1.add_pipeline("do_nothing") - pipeline = run.make_pipeline_instance() run.selected_steps = ["step2", "not_existing_step"] run.save() + pipeline = run.make_pipeline_instance() exitcode, out = pipeline.execute() self.assertEqual(0, exitcode) @@ -174,6 +174,7 @@ def test_scanpipe_pipeline_class_execute_with_selected_steps(self, step2, step1) def test_scanpipe_pipeline_class_download_inputs_attribute(self): project1 = Project.objects.create(name="Analysis") run = project1.add_pipeline("do_nothing") + run.pipeline_class.download_inputs = True pipeline = run.make_pipeline_instance() self.assertTrue(pipeline.download_inputs) pipeline.execute() @@ -181,7 +182,7 @@ def test_scanpipe_pipeline_class_download_inputs_attribute(self): run = project1.add_pipeline("do_nothing") pipeline = run.make_pipeline_instance() - pipeline.download_inputs = False + run.pipeline_class.download_inputs = False pipeline.execute() self.assertNotIn("Step [download_missing_inputs]", run.log)