Skip to content

Commit

Permalink
fix: improved dependency tree handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dervoeti committed Nov 19, 2024
1 parent 00cf0c2 commit c8d3344
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def get_component_dependencies(
data: dict,
components: dict[str, Component],
component: Component,
component_dependency_paths: dict[str, list[list[str]]],
component_dependency_paths: dict[str, list[str]],
) -> tuple[str, list[dict]]:
component_dependencies: list[dict[str, str | list[str]]] = []

Expand All @@ -28,19 +28,8 @@ def get_component_dependencies(
observation_component_dependencies = ""

paths = component_dependency_paths.get(component.bom_ref, [])
seen_relations = set()
for path in paths:
for i, node in enumerate(path):
if i == 0:
parent = node
continue

relation = f"{_translate_component(parent, components)} --> {_translate_component(node, components)}\n"

parent = node
if relation not in seen_relations:
observation_component_dependencies += relation
seen_relations.add(relation)
for edge in paths:
observation_component_dependencies += f"{edge}\n"

if len(observation_component_dependencies) > 32768:
observation_component_dependencies = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django.core.files.base import File
from trycast import trycast

from application.core.models import Branch, Observation
from application.core.models import Observation
from application.core.types import Severity
from application.import_observations.parsers.base_parser import (
BaseFileParser,
Expand Down Expand Up @@ -204,6 +204,18 @@ def _get_component(self, component_data: dict[str, Any]) -> Optional[Component]:
unknown_license=", ".join(unknown_licenses),
)

def _translate_component(self, bom_ref: str) -> str:
component = self.components.get(bom_ref, None)
if not component:
return ""

if component.version:
component_name_version = f"{component.name}:{component.version}"
else:
component_name_version = component.name

return component_name_version

def _create_observations( # pylint: disable=too-many-locals
self,
data: dict,
Expand All @@ -217,38 +229,45 @@ def _create_observations( # pylint: disable=too-many-locals

dependencies = sbom_data.get("dependencies", [])

# Find the root components, meaning: Components that no other components depend on
roots = set()
nonroots = set()
for dep in dependencies:
for dep_on in dep.get("dependsOn", []):
nonroots.add(dep_on)
roots.add(dep.get("ref"))
roots = roots - nonroots

# Create a map of dependencies for each component
dep_map = {
entry["ref"]: entry.get("dependsOn", [])
for entry in sbom_data.get("dependencies", [])
}

dependency_paths = defaultdict(list)

# Traverse the dependency tree from each root component
# While doing that, accumulate all paths from each root to each leaf
def traverse(node, path):
# Avoid indirect cycles
if node in path:
return

print(f"Traversing {node} with path {path}")
dependency_paths[node].append(path)
for dep in dep_map.get(node, []):
if dep not in path: # Avoid direct cycles
traverse(dep, path + [dep])

for root in roots:
traverse(root, [root])
reverse_dep_map = defaultdict(list)
for entry in dependencies:
for dep in entry.get("dependsOn", []):
reverse_dep_map[dep].append(
entry["ref"]
) # Add a relation from the dependency it's "parent"

relevant_components = set()
for vulnerability in data.get("vulnerabilities", []):
for affected in vulnerability.get("affects", []):
ref = affected.get("ref")
if ref:
component = self.components.get(ref)
if component:
relevant_components.add(component.bom_ref)

dependency_paths: dict[str, list[str]] = defaultdict(list)

# Get all paths from the root components in the dependency tree to the relevant components
for relevant_component in relevant_components:
stack: list[tuple[str, Optional[str]]] = [(relevant_component, None)]
visited = set()
if relevant_component not in dependency_paths:
dependency_paths[relevant_component] = []
while stack:
current, previous = stack.pop()
if not current:
continue

if previous:
path = f"{self._translate_component(current)} --> {self._translate_component(previous)}"
if path not in dependency_paths[relevant_component]:
dependency_paths[relevant_component].append(path)
if current in visited:
continue
visited.add(current)
if current in reverse_dep_map:
for parent in reverse_dep_map[current]:
stack.append((parent, current))

for vulnerability in data.get("vulnerabilities", []):
vulnerability_id = vulnerability.get("id")
Expand All @@ -268,7 +287,6 @@ def traverse(node, path):
component = self.components.get(ref)
print(f"Processing vulnerability: {vulnerability_id}")
if component:
print(f"Found component for: {vulnerability_id}")
title = vulnerability_id

if component.bom_ref in component_dependencies_cache:
Expand Down

0 comments on commit c8d3344

Please sign in to comment.