-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add async task scheduling with Job model and API endpoints #5421
base: main
Are you sure you want to change the base?
Conversation
- Introduced a new Job model to manage scheduled jobs, extending APScheduler's functionality with additional metadata for Langflow. - Updated the __init__.py file to include the Job model in the exports, ensuring it is accessible throughout the application.
- Introduced a new Alembic migration script to create a 'job' table in the database. - The 'job' table includes fields for job metadata, such as next run time, job state, and foreign keys for flow and user associations. - Added indices for improved query performance on job name, flow ID, and user ID. - Implemented downgrade functionality to remove the job table and its indices if necessary.
- Introduced a new `tasks.py` file containing the API router for task management. - Implemented endpoints for creating, retrieving, and canceling tasks. - Updated the main router to include the new tasks router. - Enhanced the `__init__.py` file to export the tasks router for accessibility throughout the application.
- Added a new `jobstore.py` file that defines the `LangFlowJobStore` class, which integrates with SQLModel to manage scheduled jobs in the Langflow database. - Implemented methods for adding, updating, removing, and retrieving jobs, including support for one-off tasks. - Enhanced job restoration and error handling during job retrieval and storage. - Introduced functionality to fetch user-specific jobs and due jobs based on the current time. - Improved job state management using serialization with pickle for job persistence.
- Renamed `LangFlowJobStore` to `AsyncSQLModelJobStore` for clarity and consistency with async operations. - Added a new `scheduler.py` file implementing `AsyncScheduler`, enhancing job scheduling capabilities with asynchronous support. - Improved job processing and management, allowing for better handling of scheduled tasks in an asynchronous environment. - Updated job addition and removal methods to be compatible with async execution, ensuring efficient job handling.
- Refactored the `create` method in `TaskServiceFactory` to accept a `SettingsService` instance as a parameter. - Enhanced the `create` method's docstring to clarify its purpose in creating a `TaskService` with required dependencies.
- Introduced an asynchronous scheduler and job store to manage tasks effectively. - Added event listeners for job execution and error handling, updating task statuses accordingly. - Implemented methods for creating, retrieving, and canceling tasks, with support for user-specific job queries. - Refactored task status management using an Enum for better clarity and control over task states. - Improved error handling and logging throughout task operations for better traceability.
- Introduced comprehensive unit tests for the task management API, covering task creation, retrieval, cancellation, and access control. - Implemented tests for handling invalid requests and malicious input to ensure robustness and security. - Added fixtures for user setup and authentication to streamline test execution. - Enhanced test coverage for task status transitions and concurrent task creation scenarios.
…ocumentation - Updated the `start` and `_start` methods in `AsyncScheduler` to include type hints for the `paused` parameter. - Enhanced the docstring for the `start` method to clarify its functionality and parameter usage. - Added type hinting for the `_eventloop` attribute to improve code clarity and maintainability. - Included a type ignore comment to suppress type checking warnings in the `_start` method.
- Bumped the version of the blockbuster package from 1.5.6 to 1.5.7. - Updated the source distribution URL and hash for the new version. - Adjusted the wheel URL and hash to reflect the latest release. - Modified the development requirements to specify the new version constraint for blockbuster.
- Added an `initialize` method to the `TaskService` class to set up the async scheduler and job store upon instantiation. - Introduced a `_started` attribute to track the initialization state of the service. - Enhanced the service's constructor to call the `initialize` method, ensuring proper setup of scheduling components.
- Added a constructor to the AsyncScheduler class to set the default timezone to UTC. - Enhanced the initialization process to ensure proper timezone handling for scheduled tasks.
- Introduced an `initialize` method in the `Service` class to facilitate asynchronous setup processes. - This addition enhances the service's lifecycle management, allowing for future initialization logic to be implemented.
…tion - Introduced an `async setup` method in the `ServiceManager` class to initialize all services asynchronously. - Updated the `initialize_services` function to call the new setup method, ensuring proper service initialization during application startup. - This enhancement improves the lifecycle management of services, allowing for better handling of asynchronous setup processes.
- Changed the method name from `initialize` to `setup` in the `Service` class to better reflect its purpose in the service lifecycle. - This update aligns with the recent enhancements in service management, improving code readability and consistency.
…roved clarity - Changed the method name from `initialize` to `setup` in the `ServiceManager` class to better reflect its purpose in the service lifecycle. - Updated the service invocation to call the new `setup` method, ensuring consistency with recent enhancements in service management.
…nitialization - Refactored the TaskService class to replace the synchronous `initialize` method with an asynchronous `setup` method for better service lifecycle management. - Enhanced the scheduler initialization process by using `asyncio.to_thread` to ensure non-blocking behavior during setup. - Added checks to ensure the scheduler is properly set up before starting, improving reliability in task scheduling.
- Added AsyncSQLModelJobStore class to manage jobs using SQLModel, supporting one-off tasks. - Implemented methods for job retrieval, addition, updating, and deletion, ensuring robust handling of job states. - Enhanced error handling and logging for job restoration and management processes. - Introduced functionality to fetch user-specific jobs and due jobs based on the current time.
…spath in pydevd_file_utils
…outine instead of iscoroutinefunction
…th JobStatus enum
Copilot
AI
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 9 out of 22 changed files in this pull request and generated 1 comment.
Files not reviewed (13)
- src/backend/base/langflow/alembic/versions/a6faa131285d_add_job_table.py: Evaluated as low risk
- src/backend/base/langflow/api/router.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/init.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/tasks.py: Evaluated as low risk
- src/backend/base/langflow/services/base.py: Evaluated as low risk
- src/backend/tests/unit/services/tasks/test_tasks_service.py: Evaluated as low risk
- src/backend/tests/unit/api/v1/test_tasks.py: Evaluated as low risk
- src/backend/tests/conftest.py: Evaluated as low risk
- src/backend/base/pyproject.toml: Evaluated as low risk
- src/backend/base/langflow/services/utils.py: Evaluated as low risk
- src/backend/base/langflow/services/database/models/init.py: Evaluated as low risk
- src/backend/base/langflow/services/task/service.py: Evaluated as low risk
- src/backend/base/langflow/services/task/factory.py: Evaluated as low risk
Comments suppressed due to low confidence (4)
src/backend/base/langflow/services/task/jobstore.py:107
- [nitpick] The logic in add_job for extracting flow_id and api_key_user_id could be simplified or moved to a helper function for better readability.
job_state = pickle.dumps(job.__getstate__())
src/backend/base/langflow/services/database/models/job/model.py:33
- The status field should be of type JobStatus instead of str to ensure type safety.
status: str = Field(default=JobStatus.PENDING)
src/backend/base/langflow/services/database/models/job/model.py:52
- The status field in JobRead model should be of type JobStatus instead of str to ensure type safety.
status: str
src/backend/base/langflow/services/database/models/job/model.py:35
- [nitpick] The error field should be of type Optional[str] to be more descriptive.
error: str | None = Field(default=None)
|
||
def _reconstitute_job(self, job_state): | ||
"""Reconstitute a job from its serialized state.""" | ||
job_state_dict = job_state if isinstance(job_state, dict) else pickle.loads(job_state) # noqa: S301 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack of error handling around pickle.loads. Add try-except block to handle potential deserialization errors.
Copilot is powered by AI, so mistakes are possible. Review output carefully before use.
This pull request introduces a comprehensive async task scheduling system to the application. It includes the addition of the APScheduler dependency, a new Job model for managing scheduled tasks, and an Alembic migration script to create the corresponding database table. The implementation features a dedicated API router for task management, allowing users to create, retrieve, and cancel tasks. Enhancements to the TaskService facilitate async scheduling and job management, while robust error handling and logging improve traceability. Unit tests have been added to ensure the reliability of the task management API and job model functionalities. Overall, this PR significantly enhances the application's capabilities for handling scheduled tasks asynchronously.