diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..8c15e69c 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List -from dagster import In, Nothing, Out, job, op, usable_as_dagster_type +from dagster import In, Nothing, Out, job, op, usable_as_dagster_type, get_dagster_logger from pydantic import BaseModel @@ -50,16 +50,29 @@ def get_s3_data(context): return output -@op -def process_data(): - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + out=None, + description="Write to Redis" +) +def put_redis_data(agg_max: Aggregation) -> None: + # Log the output + logger = get_dagster_logger() + logger.info(f"Write {agg_max} to Redis.") pass @job def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 3313403d..18c0665c 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,30 +1,54 @@ from typing import List -from dagster import In, Nothing, Out, ResourceDefinition, graph, op +from dagster import In, Nothing, Out, ResourceDefinition, graph, op, get_dagster_logger from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context) -> List[Stock]: + stocks = context.resources.s3.get_data(context.op_config["s3_key"]) + return [Stock.from_list(stock) for stock in stocks] -@op -def process_data(): - # Use your op from week 1 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + tags={"kind": "transformation"}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + # Log the output + logger = get_dagster_logger() + logger.info(f"Higest stock is: {max_stock}") + return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): - pass +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + out=Out(dagster_type=Nothing), + tags={"kind": "redis"}, + description="Write to Redis" +) +def put_redis_data(context, agg_max: Aggregation) -> None: + data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) + context.log.info(f"Write {data} to Redis.") @graph def week_2_pipeline(): - # Use your graph from week 1 - pass + s3_data = get_s3_data() + highest_stock = process_data(s3_data) + put_redis_data(highest_stock) local = { diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..e00f36e2 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,35 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String), + }, + description="A resource that can connect to S3." +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int), + }, + description="A resource that connects to Redis.", +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"], + )