From c8d334466988a758bd7921c9d4b4a43a80600b9d Mon Sep 17 00:00:00 2001 From: dervoeti Date: Tue, 19 Nov 2024 22:04:35 +0100 Subject: [PATCH] fix: improved dependency tree handling --- .../parsers/cyclone_dx/dependencies.py | 17 +--- .../parsers/cyclone_dx/parser.py | 86 +++++++++++-------- 2 files changed, 55 insertions(+), 48 deletions(-) diff --git a/backend/application/import_observations/parsers/cyclone_dx/dependencies.py b/backend/application/import_observations/parsers/cyclone_dx/dependencies.py index 854d63daf..57428ec0c 100644 --- a/backend/application/import_observations/parsers/cyclone_dx/dependencies.py +++ b/backend/application/import_observations/parsers/cyclone_dx/dependencies.py @@ -10,7 +10,7 @@ def get_component_dependencies( data: dict, components: dict[str, Component], component: Component, - component_dependency_paths: dict[str, list[list[str]]], + component_dependency_paths: dict[str, list[str]], ) -> tuple[str, list[dict]]: component_dependencies: list[dict[str, str | list[str]]] = [] @@ -28,19 +28,8 @@ def get_component_dependencies( observation_component_dependencies = "" paths = component_dependency_paths.get(component.bom_ref, []) - seen_relations = set() - for path in paths: - for i, node in enumerate(path): - if i == 0: - parent = node - continue - - relation = f"{_translate_component(parent, components)} --> {_translate_component(node, components)}\n" - - parent = node - if relation not in seen_relations: - observation_component_dependencies += relation - seen_relations.add(relation) + for edge in paths: + observation_component_dependencies += f"{edge}\n" if len(observation_component_dependencies) > 32768: observation_component_dependencies = ( diff --git a/backend/application/import_observations/parsers/cyclone_dx/parser.py b/backend/application/import_observations/parsers/cyclone_dx/parser.py index 40d24f802..9f23f3a9b 100644 --- a/backend/application/import_observations/parsers/cyclone_dx/parser.py +++ b/backend/application/import_observations/parsers/cyclone_dx/parser.py @@ -9,7 +9,7 @@ from django.core.files.base import File from trycast import trycast -from application.core.models import Branch, Observation +from application.core.models import Observation from application.core.types import Severity from application.import_observations.parsers.base_parser import ( BaseFileParser, @@ -204,6 +204,18 @@ def _get_component(self, component_data: dict[str, Any]) -> Optional[Component]: unknown_license=", ".join(unknown_licenses), ) + def _translate_component(self, bom_ref: str) -> str: + component = self.components.get(bom_ref, None) + if not component: + return "" + + if component.version: + component_name_version = f"{component.name}:{component.version}" + else: + component_name_version = component.name + + return component_name_version + def _create_observations( # pylint: disable=too-many-locals self, data: dict, @@ -217,38 +229,45 @@ def _create_observations( # pylint: disable=too-many-locals dependencies = sbom_data.get("dependencies", []) - # Find the root components, meaning: Components that no other components depend on - roots = set() - nonroots = set() - for dep in dependencies: - for dep_on in dep.get("dependsOn", []): - nonroots.add(dep_on) - roots.add(dep.get("ref")) - roots = roots - nonroots - - # Create a map of dependencies for each component - dep_map = { - entry["ref"]: entry.get("dependsOn", []) - for entry in sbom_data.get("dependencies", []) - } - - dependency_paths = defaultdict(list) - - # Traverse the dependency tree from each root component - # While doing that, accumulate all paths from each root to each leaf - def traverse(node, path): - # Avoid indirect cycles - if node in path: - return - - print(f"Traversing {node} with path {path}") - dependency_paths[node].append(path) - for dep in dep_map.get(node, []): - if dep not in path: # Avoid direct cycles - traverse(dep, path + [dep]) - - for root in roots: - traverse(root, [root]) + reverse_dep_map = defaultdict(list) + for entry in dependencies: + for dep in entry.get("dependsOn", []): + reverse_dep_map[dep].append( + entry["ref"] + ) # Add a relation from the dependency it's "parent" + + relevant_components = set() + for vulnerability in data.get("vulnerabilities", []): + for affected in vulnerability.get("affects", []): + ref = affected.get("ref") + if ref: + component = self.components.get(ref) + if component: + relevant_components.add(component.bom_ref) + + dependency_paths: dict[str, list[str]] = defaultdict(list) + + # Get all paths from the root components in the dependency tree to the relevant components + for relevant_component in relevant_components: + stack: list[tuple[str, Optional[str]]] = [(relevant_component, None)] + visited = set() + if relevant_component not in dependency_paths: + dependency_paths[relevant_component] = [] + while stack: + current, previous = stack.pop() + if not current: + continue + + if previous: + path = f"{self._translate_component(current)} --> {self._translate_component(previous)}" + if path not in dependency_paths[relevant_component]: + dependency_paths[relevant_component].append(path) + if current in visited: + continue + visited.add(current) + if current in reverse_dep_map: + for parent in reverse_dep_map[current]: + stack.append((parent, current)) for vulnerability in data.get("vulnerabilities", []): vulnerability_id = vulnerability.get("id") @@ -268,7 +287,6 @@ def traverse(node, path): component = self.components.get(ref) print(f"Processing vulnerability: {vulnerability_id}") if component: - print(f"Found component for: {vulnerability_id}") title = vulnerability_id if component.bom_ref in component_dependencies_cache: