Skip to content

Commit

Permalink
Infrastructure config changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sohambutala committed Oct 23, 2024
1 parent 2197ba7 commit a8d8ced
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 34 deletions.
28 changes: 9 additions & 19 deletions echodataflow/deployment/deployment_demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ services:
type: Process
cluster:
workers: 5
address: tcp://

address: tcp://
stages:
- name: edf_Sv_pipeline
module: echodataflow.stages.subflows.Sv_pipeline # Optional module name, echodataflow to create a default flow for all the stages within it
- name: Sv_flow
module: echodataflow.deployment.flow # Optional module name, echodataflow to create a default flow for all the stages within it
tasks:
- name: echodataflow_open_raw
module: echodataflow.stages.subflows.open_raw
Expand All @@ -54,20 +53,11 @@ services:
use_offline: true # Skip this process if zarr files are already present in the output directory.
group: False # Group Converted files based on Transect
source:
path: some_url # Single path, list or paths or a dict with some key and value as list of paths
parameters:
# Jinja support
window_options:
time_travel_hours: 20
time_travel_mins: 0
window_hours: 0
window_mins: 40
number_of_windows: 3
path: s3://ncei-wcsd-archive/data/raw/Bell_M._Shimada/SH1707/EK60/*.raw # Single path, list or paths or a dict with some key and value as list of paths
storage_options:
block_name: ABCD
block_type: AWS
anon: true
group:
path: s3://
file: s3://
grouping_regex: # pattern for grouping files based on filename
storage_options:
block_name: ABCD
Expand Down Expand Up @@ -102,7 +92,7 @@ services:
- name: stage_produce_mvbs
module: echodataflow.stages.subflows.compute_MVBS
source:
urlpath: s3://
path: s3://
parameters:
# Jinja support
window_options:
Expand Down Expand Up @@ -140,7 +130,7 @@ services:
- name: echodataflow_compute_NASC
module: echodataflow.stages.subflows.compute_NASC
source:
urlpath: s3://
path: s3://
parameters:
# Jinja support
window_options:
Expand All @@ -153,7 +143,7 @@ services:
block_name: ABCD
block_type: AWS
destination:
path: s3://
path: s3://
storage_options:
block_name: ABCD
block_type: AWS
Expand Down
8 changes: 4 additions & 4 deletions echodataflow/deployment/deployment_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ async def _deploy_service(

schedule = [DeploymentScheduleCreate(schedule=IntervalSchedule(interval=timedelta(minutes=service.schedule.interval_mins), anchor_date=service.schedule.anchor_date))]

await create_work_pool_and_queue(service.workpool)
await create_work_pool_and_queue(service.infrastructure.workpool)

deployment: RunnerDeployment = await edf_service_fn.to_deployment(
name=service.name,
parameters={"stages": service.stages, "edf_logger": logging_dict},
work_queue_name=service.workpool.name,
work_pool_name=service.workpool.workqueue.name,
parameters={"stages": service.stages, "edf_logger": logging_dict, "cluster": service.infrastructure.cluster},
work_pool_name=service.infrastructure.workpool.name,
work_queue_name=service.infrastructure.workpool.workqueue_name,
tags=service.tags,
schedules=schedule
)
Expand Down
44 changes: 34 additions & 10 deletions echodataflow/models/deployment/deployment.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field, ValidationInfo, field_validator
from prefect.client.schemas.objects import WorkPool, WorkQueue

# Assuming these imports exist in your module
from echodataflow.models.deployment.deployment_schedule import DeploymentSchedule
from echodataflow.models.deployment.stage import Stage
from echodataflow.models.deployment.storage_options import StorageOptions
Expand All @@ -28,6 +27,7 @@ def validate_handler(cls, v):

class EDFWorkPool(WorkPool):
name: str = Field("Echodataflow-WorkPool", description="Name of the WorkPool.")
type: str = Field("Process", description="Type of WorkPool.")
workqueue: Union[str, WorkQueue] = Field("default", description="WorkQueue associated with the WorkPool.")

@property
Expand All @@ -36,6 +36,20 @@ def workqueue_name(self) -> str:
return self.workqueue.name
return self.workqueue

class config:
arbitrary_types_allowed=True

class Cluster(BaseModel):
address: Optional[str] = Field(None, description="Dask scheduler address")
workers: Optional[int] = Field(3, description="Number of workers")

class Infrastructure(BaseModel):
workpool: Optional[EDFWorkPool] = Field(EDFWorkPool(), description="WorkPool configuration for the service.")
cluster: Optional[Cluster] = Field(None, description="Dask Cluster Configuration")

class Config:
arbitrary_types_allowed = True

class Service(BaseModel):
"""
Model for defining a service in the deployment pipeline.
Expand All @@ -56,7 +70,7 @@ class Service(BaseModel):
schedule: Optional[DeploymentSchedule] = Field(None, description="Scheduling details for the service.")
stages: List[Stage] = Field(None, description="List of stages included in the service.")
logging: Optional[EDFLogging] = Field(None, description="Logging configuration for the service.")
workpool: Optional[EDFWorkPool] = Field(WorkPool(name="Echodataflow-WorkPool", type="Process"), description="WorkPool configuration for the service.")
infrastructure: Optional[Infrastructure] = Field(Infrastructure(), description="Infrastructure configuration for the service.")

# Validators
@field_validator("name", mode="before")
Expand All @@ -79,6 +93,8 @@ def validate_tags(cls, v):
return list(unique_tags)
return v

class Config:
arbitrary_types_allowed = True

class Deployment(BaseModel):
"""
Expand All @@ -101,24 +117,32 @@ def validate_services(cls, v):
return v

@field_validator("base_path", mode="after")
def construct_path(cls, v):

def construct_path(cls, v, info: ValidationInfo):
"""
This validator ensures that if a base_path is provided, it is applied to all services.
"""
services = info.data.get("services", [])
if v is not None:
for service in cls.services:
for service in services:
for stage in service.stages:
stage.source.path = v + stage.source.path
stage.destination.path = v + stage.destination.path
stage.group.path = v + stage.group.path
return v

@field_validator("storage_options", mode="after")
def apply_storage_options(cls, v):

def apply_storage_options(cls, v, info: ValidationInfo):
"""
This validator ensures that if storage options are provided, they are applied to all services.
"""
services = info.data.get("services", [])
if v is not None:
for service in cls.services:
for service in services:
for stage in service.stages:
stage.source.storage_options = v
stage.destination.storage_options = v
stage.group.storage_options = v
return v


class Config:
arbitrary_types_allowed = True
2 changes: 1 addition & 1 deletion echodataflow/utils/prefect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def create_work_pool_and_queue(edf_work_pool: EDFWorkPool):
# Use the Prefect client asynchronously to interact with the API
async with get_client() as client:
# Get or create the work pool
work_pool = await _get_or_create_work_pool(client=client, work_pool_name=edf_work_pool.name)
work_pool = await _get_or_create_work_pool(client=client, work_pool=edf_work_pool)

# If the work pool exists, proceed to get or create the work queue
if work_pool:
Expand Down

0 comments on commit a8d8ced

Please sign in to comment.