Skip to content

Commit

Permalink
feat(git.py): refactor GitLoaderComponent to support asynchronous ope…
Browse files Browse the repository at this point in the history
…rations and improve temporary directory management

- Convert methods to async to enhance performance with file operations.
- Implement async context manager for handling temporary clone directories.
- Update binary file check and content filtering to be asynchronous.
  • Loading branch information
ogabrielluiz committed Dec 20, 2024
1 parent b8c290a commit cfe13b4
Showing 1 changed file with 44 additions and 30 deletions.
74 changes: 44 additions & 30 deletions src/backend/base/langflow/components/git/git.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import asyncio
import re
import shutil
import tempfile
from contextlib import asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING

import anyio
from langchain_community.document_loaders.git import GitLoader

from langflow.custom import Component
from langflow.io import (
DropdownInput,
MessageTextInput,
Output,
)
from langflow.io import DropdownInput, MessageTextInput, Output
from langflow.schema import Data

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable


class GitLoaderComponent(Component):
display_name = "Git"
Expand Down Expand Up @@ -80,13 +82,23 @@ class GitLoaderComponent(Component):
Output(name="data", display_name="Data", method="load_documents"),
]

_temp_clone_path: str | None = None

@staticmethod
def is_binary(file_path: str) -> bool:
async def is_binary(file_path: str | Path) -> bool:
"""Check if a file is binary by looking for null bytes."""
with Path(file_path).open("rb") as file:
return b"\x00" in file.read(1024)
path = anyio.Path(file_path)
content = await path.read_bytes()
return b"\x00" in content[:1024]

@asynccontextmanager
async def temp_clone_dir(self):
"""Context manager for handling temporary clone directory."""
temp_dir = None
try:
temp_dir = tempfile.mkdtemp(prefix="langflow_clone_")
yield temp_dir
finally:
if temp_dir:
await anyio.Path(temp_dir).remove()

def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None) -> dict:
# Hide fields by default
Expand All @@ -105,11 +117,11 @@ def update_build_config(self, build_config: dict, field_value: str, field_name:

return build_config

def build_gitloader(self) -> GitLoader:
async def build_gitloader(self) -> GitLoader:
file_filter_patterns = getattr(self, "file_filter", None)
content_filter_pattern = getattr(self, "content_filter", None)

file_filters = []
file_filters: list[Callable[[Path], bool] | Callable[[Path], Awaitable[bool]]] = []
if file_filter_patterns:
patterns = [pattern.strip() for pattern in file_filter_patterns.split(",")]

Expand All @@ -125,17 +137,27 @@ def file_filter(file_path: Path) -> bool:
if content_filter_pattern:
content_regex = re.compile(content_filter_pattern)

def content_filter(file_path: Path) -> bool:
content = file_path.read_text(encoding="utf-8", errors="ignore")
async def content_filter(file_path: Path) -> bool:
path = anyio.Path(file_path)
content = await path.read_text()
return bool(content_regex.search(content))

file_filters.append(content_filter)

def combined_filter(file_path: str) -> bool:
async def combined_filter(file_path: str) -> bool:
path = Path(file_path)
if self.is_binary(file_path):
if await self.is_binary(file_path):
return False
return all(f(path) for f in file_filters) if file_filters else True

results = []
for f in file_filters:
if asyncio.iscoroutinefunction(f):
result = await f(path)
else:
result = f(path)
results.append(result)

return all(results) if results else True

repo_source = getattr(self, "repo_source", None)
if repo_source == "Local":
Expand All @@ -144,9 +166,8 @@ def combined_filter(file_path: str) -> bool:
else:
# Clone source
clone_url = self.clone_url
# Generate a temporary directory for cloning
self._temp_clone_path = tempfile.mkdtemp(prefix="langflow_clone_")
repo_path = self._temp_clone_path
async with self.temp_clone_dir() as temp_dir:
repo_path = temp_dir

return GitLoader(
repo_path=repo_path,
Expand All @@ -155,16 +176,9 @@ def combined_filter(file_path: str) -> bool:
file_filter=combined_filter,
)

def load_documents(self) -> list[Data]:
gitloader = self.build_gitloader()
async def load_documents(self) -> list[Data]:
gitloader = await self.build_gitloader()
documents = list(gitloader.lazy_load())
data = [Data.from_document(doc) for doc in documents]
self.status = data

# Cleanup after loading if cloned
repo_source = getattr(self, "repo_source", None)
if repo_source == "Remote" and self._temp_clone_path:
shutil.rmtree(self._temp_clone_path, ignore_errors=True)
self._temp_clone_path = None

return data

0 comments on commit cfe13b4

Please sign in to comment.