diff --git a/src/backend/base/langflow/components/git/git.py b/src/backend/base/langflow/components/git/git.py index 9a48aef6653d..d311c3912bcf 100644 --- a/src/backend/base/langflow/components/git/git.py +++ b/src/backend/base/langflow/components/git/git.py @@ -1,18 +1,20 @@ +import asyncio import re -import shutil import tempfile +from contextlib import asynccontextmanager from pathlib import Path +from typing import TYPE_CHECKING +import anyio from langchain_community.document_loaders.git import GitLoader from langflow.custom import Component -from langflow.io import ( - DropdownInput, - MessageTextInput, - Output, -) +from langflow.io import DropdownInput, MessageTextInput, Output from langflow.schema import Data +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + class GitLoaderComponent(Component): display_name = "Git" @@ -80,13 +82,23 @@ class GitLoaderComponent(Component): Output(name="data", display_name="Data", method="load_documents"), ] - _temp_clone_path: str | None = None - @staticmethod - def is_binary(file_path: str) -> bool: + async def is_binary(file_path: str | Path) -> bool: """Check if a file is binary by looking for null bytes.""" - with Path(file_path).open("rb") as file: - return b"\x00" in file.read(1024) + path = anyio.Path(file_path) + content = await path.read_bytes() + return b"\x00" in content[:1024] + + @asynccontextmanager + async def temp_clone_dir(self): + """Context manager for handling temporary clone directory.""" + temp_dir = None + try: + temp_dir = tempfile.mkdtemp(prefix="langflow_clone_") + yield temp_dir + finally: + if temp_dir: + await anyio.Path(temp_dir).remove() def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict: # Hide fields by default @@ -105,11 +117,11 @@ def update_build_config(self, build_config: dict, field_value: str, field_name: return build_config - def build_gitloader(self) -> GitLoader: + async def build_gitloader(self) -> GitLoader: file_filter_patterns = getattr(self, "file_filter", None) content_filter_pattern = getattr(self, "content_filter", None) - file_filters = [] + file_filters: list[Callable[[Path], bool] | Callable[[Path], Awaitable[bool]]] = [] if file_filter_patterns: patterns = [pattern.strip() for pattern in file_filter_patterns.split(",")] @@ -125,17 +137,27 @@ def file_filter(file_path: Path) -> bool: if content_filter_pattern: content_regex = re.compile(content_filter_pattern) - def content_filter(file_path: Path) -> bool: - content = file_path.read_text(encoding="utf-8", errors="ignore") + async def content_filter(file_path: Path) -> bool: + path = anyio.Path(file_path) + content = await path.read_text() return bool(content_regex.search(content)) file_filters.append(content_filter) - def combined_filter(file_path: str) -> bool: + async def combined_filter(file_path: str) -> bool: path = Path(file_path) - if self.is_binary(file_path): + if await self.is_binary(file_path): return False - return all(f(path) for f in file_filters) if file_filters else True + + results = [] + for f in file_filters: + if asyncio.iscoroutinefunction(f): + result = await f(path) + else: + result = f(path) + results.append(result) + + return all(results) if results else True repo_source = getattr(self, "repo_source", None) if repo_source == "Local": @@ -144,9 +166,8 @@ def combined_filter(file_path: str) -> bool: else: # Clone source clone_url = self.clone_url - # Generate a temporary directory for cloning - self._temp_clone_path = tempfile.mkdtemp(prefix="langflow_clone_") - repo_path = self._temp_clone_path + async with self.temp_clone_dir() as temp_dir: + repo_path = temp_dir return GitLoader( repo_path=repo_path, @@ -155,16 +176,9 @@ def combined_filter(file_path: str) -> bool: file_filter=combined_filter, ) - def load_documents(self) -> list[Data]: - gitloader = self.build_gitloader() + async def load_documents(self) -> list[Data]: + gitloader = await self.build_gitloader() documents = list(gitloader.lazy_load()) data = [Data.from_document(doc) for doc in documents] self.status = data - - # Cleanup after loading if cloned - repo_source = getattr(self, "repo_source", None) - if repo_source == "Remote" and self._temp_clone_path: - shutil.rmtree(self._temp_clone_path, ignore_errors=True) - self._temp_clone_path = None - return data