Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async task scheduling with Job model and API endpoints #5421

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d707672
Add APScheduler dependency for async task scheduling
ogabrielluiz Dec 23, 2024
cc17d07
feat: Add Job model for scheduled tasks and update exports
ogabrielluiz Dec 23, 2024
4180937
feat: Add migration script to create job table for scheduled tasks
ogabrielluiz Dec 23, 2024
653f039
feat: Add tasks API router and endpoints for task management
ogabrielluiz Dec 23, 2024
cb16a86
feat: Implement LangFlowJobStore for managing scheduled jobs
ogabrielluiz Dec 23, 2024
00a5c76
feat: Refactor job store and introduce async scheduler
ogabrielluiz Dec 23, 2024
c192c8d
feat: Update TaskServiceFactory to include SettingsService dependency
ogabrielluiz Dec 23, 2024
960eb37
feat: Enhance TaskService with async scheduling and job management
ogabrielluiz Dec 23, 2024
b8c2642
feat: Add unit tests for task management API
ogabrielluiz Dec 23, 2024
43319f6
feat: Refactor AsyncScheduler methods for improved type hinting and d…
ogabrielluiz Dec 23, 2024
f195b3f
chore: Update blockbuster package version to 1.5.7
ogabrielluiz Dec 23, 2024
5410def
feat: Initialize TaskService with async scheduler setup
ogabrielluiz Dec 23, 2024
89be9fb
feat: Initialize timezone support in AsyncScheduler constructor
ogabrielluiz Dec 23, 2024
b9ce8b8
feat: Add initialize method to Service class for async setup
ogabrielluiz Dec 23, 2024
c8d436f
feat: Add async setup method to ServiceManager for service initializa…
ogabrielluiz Dec 23, 2024
c9bd996
refactor: Rename initialize method to setup in Service class for clarity
ogabrielluiz Dec 23, 2024
f469cf7
refactor: Rename initialize method to setup in ServiceManager for imp…
ogabrielluiz Dec 23, 2024
d9cf913
feat: Implement async setup for TaskService with improved scheduler i…
ogabrielluiz Dec 23, 2024
0115fac
feat: Implement AsyncSQLModelJobStore for job management in Langflow
ogabrielluiz Dec 23, 2024
ebd2af2
feat: Enhance blockbuster function to include blocking for os.path.ab…
ogabrielluiz Dec 23, 2024
d6fc2ef
feat: Add JobStatus enum and enhance Job model with status, result, a…
ogabrielluiz Dec 23, 2024
5c20cbf
fix: Update ServiceManager to check for coroutine using inspect.iscor…
ogabrielluiz Dec 23, 2024
055cc5b
feat: Enhance AsyncSQLModelJobStore to handle flow and api_key_user I…
ogabrielluiz Dec 23, 2024
345f30e
test: Add unit tests for TaskService job handling and lifecycle
ogabrielluiz Dec 23, 2024
9355ce1
feat: Update TaskService to handle job status and error management wi…
ogabrielluiz Dec 23, 2024
147f1ab
fix: Update import path for AsyncSQLModelJobStore in scheduler module
ogabrielluiz Dec 23, 2024
7ddf551
test: Add unit tests for Job and JobRead models with various scenarios
ogabrielluiz Dec 23, 2024
b77e9ce
test: Add comprehensive unit tests for Job model serialization and ed…
ogabrielluiz Dec 23, 2024
2cc6348
refactor: Rename Task to Job in API and service layers for consistency
ogabrielluiz Dec 24, 2024
adfdeb3
fix: Deprecate /task endpoint and redirect to /jobs with warning message
ogabrielluiz Dec 24, 2024
934da2c
refactor: Replace tasks_router with jobs_router in API routing
ogabrielluiz Dec 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""add job table

Revision ID: a6faa131285d
Revises: e3162c1804e6
Create Date: 2024-12-23 10:54:57.844827

"""
from typing import Sequence, Union

import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector

from langflow.utils import migration

# revision identifiers, used by Alembic.
revision: str = 'a6faa131285d'
down_revision: Union[str, None] = 'e3162c1804e6'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn) # type: ignore
table_names = inspector.get_table_names()

# Create job table if it doesn't exist
if "job" not in table_names:
op.create_table(
"job",
sa.Column("id", sqlmodel.sql.sqltypes.AutoString(length=191), primary_key=True),
sa.Column("next_run_time", sa.DateTime(timezone=True), nullable=True),
sa.Column("job_state", sa.LargeBinary(), nullable=True),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("flow_id", sa.Uuid(), nullable=False),
sa.Column("user_id", sa.Uuid(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.true()),
sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.func.now(), onupdate=sa.func.now()),
sa.ForeignKeyConstraint(["flow_id"], ["flow.id"], name="fk_job_flow_id_flow", ondelete="CASCADE"),
sa.ForeignKeyConstraint(["user_id"], ["user.id"], name="fk_job_user_id_user", ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id", name="pk_job"),
)

# Create indices
with op.batch_alter_table("job", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_job_name"), ["name"], unique=False)
batch_op.create_index(batch_op.f("ix_job_flow_id"), ["flow_id"], unique=False)
batch_op.create_index(batch_op.f("ix_job_user_id"), ["user_id"], unique=False)


def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn) # type: ignore
table_names = inspector.get_table_names()

if "job" in table_names:
# Drop indices first
with op.batch_alter_table("job", schema=None) as batch_op:
batch_op.drop_index("ix_job_name")
batch_op.drop_index("ix_job_flow_id")
batch_op.drop_index("ix_job_user_id")

# Drop the table
op.drop_table("job")
2 changes: 2 additions & 0 deletions src/backend/base/langflow/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
files_router,
flows_router,
folders_router,
jobs_router,
login_router,
monitor_router,
starter_projects_router,
Expand All @@ -33,3 +34,4 @@
router.include_router(monitor_router)
router.include_router(folders_router)
router.include_router(starter_projects_router)
router.include_router(jobs_router)
2 changes: 2 additions & 0 deletions src/backend/base/langflow/api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from langflow.api.v1.files import router as files_router
from langflow.api.v1.flows import router as flows_router
from langflow.api.v1.folders import router as folders_router
from langflow.api.v1.jobs import router as jobs_router
from langflow.api.v1.login import router as login_router
from langflow.api.v1.monitor import router as monitor_router
from langflow.api.v1.starter_projects import router as starter_projects_router
Expand All @@ -19,6 +20,7 @@
"files_router",
"flows_router",
"folders_router",
"jobs_router",
"login_router",
"monitor_router",
"starter_projects_router",
Expand Down
33 changes: 10 additions & 23 deletions src/backend/base/langflow/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from langflow.services.database.models.flow.model import FlowRead
from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow
from langflow.services.database.models.user.model import User, UserRead
from langflow.services.deps import get_session_service, get_settings_service, get_task_service, get_telemetry_service
from langflow.services.deps import get_session_service, get_settings_service, get_telemetry_service
from langflow.services.settings.feature_flags import FEATURE_FLAGS
from langflow.services.telemetry.schema import RunPayload
from langflow.utils.version import get_version_info
Expand Down Expand Up @@ -507,28 +507,15 @@ async def process() -> None:


@router.get("/task/{task_id}")
async def get_task_status(task_id: str) -> TaskStatusResponse:
task_service = get_task_service()
task = task_service.get_task(task_id)
result = None
if task is None:
raise HTTPException(status_code=404, detail="Task not found")
if task.ready():
result = task.result
# If result isinstance of Exception, can we get the traceback?
if isinstance(result, Exception):
logger.exception(task.traceback)

if isinstance(result, dict) and "result" in result:
result = result["result"]
elif hasattr(result, "result"):
result = result.result

if task.status == "FAILURE":
result = str(task.result)
logger.error(f"Task {task_id} failed: {task.traceback}")

return TaskStatusResponse(status=task.status, result=result)
async def get_task_status(task_id: str) -> TaskStatusResponse: # noqa: ARG001
# Deprecate this endpoint
logger.warning(
"The /task endpoint is deprecated and will be removed in a future version. Please use /jobs instead."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The /task endpoint is deprecated and will be removed in a future version. Please use /jobs instead.",
)


@router.post(
Expand Down
94 changes: 94 additions & 0 deletions src/backend/base/langflow/api/v1/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException
from loguru import logger
from pydantic import BaseModel, Field

from langflow.api.utils import CurrentActiveUser
from langflow.api.v1.endpoints import simple_run_flow_task
from langflow.api.v1.schemas import SimplifiedAPIRequest
from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
from langflow.services.database.models.flow import Flow
from langflow.services.deps import get_task_service
from langflow.services.task.service import TaskService

router = APIRouter(prefix="/jobs", tags=["Jobs"])


class CreateJobRequest(BaseModel):
"""Request model for creating a task."""

name: str | None = None
input_request: SimplifiedAPIRequest = Field(..., description="Input request for the flow")


class TaskResponse(BaseModel):
"""Response model for task operations."""

id: str
name: str
pending: bool


@router.post("/{flow_id_or_name}", response_model=str)
async def create_job(
request: CreateJobRequest,
user: CurrentActiveUser,
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
) -> str:
"""Create a new job."""
try:
task_service = get_task_service()
return await task_service.create_job(
task_func=simple_run_flow_task,
run_at=None,
name=request.name,
kwargs={
"flow": flow,
"input_request": request.input_request,
"stream": False,
"api_key_user": user,
},
)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc


@router.get("/{task_id}", response_model=TaskResponse)
async def get_task(
task_id: str,
user: CurrentActiveUser,
) -> TaskResponse:
"""Get task information."""
task_service: TaskService = get_task_service()
task = await task_service.get_job(task_id, user.id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
logger.info(f"Task: {task}")
return TaskResponse.model_validate(task, from_attributes=True)


@router.get("/", response_model=list[TaskResponse])
async def get_tasks(
user: CurrentActiveUser,
task_service: Annotated[TaskService, Depends(get_task_service)],
pending: bool | None = None,
) -> list[TaskResponse]:
"""Get all tasks for the current user."""
tasks = await task_service.get_jobs(user_id=user.id, pending=pending)
return [TaskResponse.model_validate(task, from_attributes=True) for task in tasks]


@router.delete("/{task_id}")
async def cancel_task(
task_id: str,
user: CurrentActiveUser,
task_service: Annotated[TaskService, Depends(get_task_service)],
) -> bool:
"""Cancel a task."""
success = await task_service.cancel_job(task_id, user.id)
if not success:
raise HTTPException(status_code=404, detail="Task not found")
return True
3 changes: 3 additions & 0 deletions src/backend/base/langflow/services/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ def get_schema(self):
async def teardown(self) -> None:
return

async def setup(self) -> None:
return

def set_ready(self) -> None:
self.ready = True
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .api_key import ApiKey
from .flow import Flow
from .folder import Folder
from .job import Job
from .message import MessageTable
from .transactions import TransactionTable
from .user import User
from .variable import Variable

__all__ = ["ApiKey", "Flow", "Folder", "MessageTable", "TransactionTable", "User", "Variable"]
__all__ = ["ApiKey", "Flow", "Folder", "Job", "MessageTable", "TransactionTable", "User", "Variable"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .model import Job

__all__ = ["Job"]
60 changes: 60 additions & 0 deletions src/backend/base/langflow/services/database/models/job/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from datetime import datetime
from enum import Enum
from uuid import UUID

import sqlalchemy as sa
from sqlmodel import JSON, Boolean, Column, DateTime, Field, SQLModel


class JobStatus(str, Enum):
"""Job status enum."""

PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"


class Job(SQLModel, table=True): # type: ignore[call-arg]
"""Model for storing scheduled jobs.

This model extends APScheduler's job table with additional metadata for Langflow.
The core APScheduler fields (id, next_run_time, job_state) are used directly by APScheduler,
while the additional fields are used by Langflow for UI/API purposes.
"""

# APScheduler required fields
id: str = Field(max_length=191, primary_key=True)
next_run_time: datetime | None = Field(sa_column=Column(sa.DateTime(timezone=True)), default=None)
job_state: bytes | None = Field(sa_column=Column(sa.LargeBinary), default=None)

# Additional Langflow metadata
status: str = Field(default=JobStatus.PENDING)
result: dict | None = Field(sa_column=Column(JSON), default=None)
error: str | None = Field(default=None)
name: str = Field(index=True)
flow_id: UUID = Field(foreign_key="flow.id", index=True)
user_id: UUID = Field(foreign_key="user.id", index=True)
is_active: bool = Field(default=True, sa_column=Column(Boolean, server_default="true", nullable=False))
created_at: datetime = Field(sa_column=Column(DateTime, server_default=sa.func.now(), nullable=False))
updated_at: datetime = Field(
sa_column=Column(DateTime, server_default=sa.func.now(), nullable=False, onupdate=sa.func.now())
)


class JobRead(SQLModel):
"""Model for reading scheduled jobs."""

id: str
job_state: bytes | None
next_run_time: datetime | None
status: str

name: str
flow_id: UUID
user_id: UUID
is_active: bool
created_at: datetime
updated_at: datetime
result: dict | None
7 changes: 7 additions & 0 deletions src/backend/base/langflow/services/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ def update(self, service_name: ServiceType) -> None:
self.services.pop(service_name, None)
self.get(service_name)

async def setup(self) -> None:
"""Initialize all the services."""
for service in self.services.values():
result = service.setup()
if inspect.iscoroutine(result):
await result

async def teardown(self) -> None:
"""Teardown all the services."""
for service in self.services.values():
Expand Down
7 changes: 4 additions & 3 deletions src/backend/base/langflow/services/task/factory.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from langflow.services.factory import ServiceFactory
from langflow.services.settings.service import SettingsService
from langflow.services.task.service import TaskService


class TaskServiceFactory(ServiceFactory):
def __init__(self) -> None:
super().__init__(TaskService)

def create(self):
# Here you would have logic to create and configure a TaskService
return TaskService()
def create(self, settings_service: SettingsService) -> TaskService:
"""Create a new TaskService instance with the required dependencies."""
return TaskService(settings_service=settings_service)
Loading
Loading