From b4733e8d19e4ad632449cb3a9cce1efe6d254e5d Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sat, 20 Aug 2022 16:35:06 +0100 Subject: [PATCH 1/7] Add process_data logic --- week_1/project/week_1.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index ccfe963b..626b94bc 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -50,9 +50,15 @@ def get_s3_data(context): return output -@op -def process_data(): - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + return Aggregation(date=max_stock.date, high=max_stock.high) @op From 8785918bacfff467daa3488a1217336ce1f43dae Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sat, 20 Aug 2022 17:35:32 +0100 Subject: [PATCH 2/7] Add stub for pub_redis_data --- week_1/project/week_1.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 626b94bc..9e07be25 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List -from dagster import In, Nothing, Out, job, op, usable_as_dagster_type +from dagster import In, Nothing, Out, job, op, usable_as_dagster_type, get_dagster_logger from pydantic import BaseModel @@ -61,8 +61,15 @@ def process_data(stocks: List[Stock]) -> Aggregation: return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + out=None, + description="Write to Redis" +) +def put_redis_data(agg_max: Aggregation) -> None: + # Log the output + logger = get_dagster_logger() + logger.info(f"Write {agg_max} to Redis.") pass From 60b321a0c7d587bb470138a68580346a73a688fa Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sat, 20 Aug 2022 17:37:00 +0100 Subject: [PATCH 3/7] Add ops to a job --- week_1/project/week_1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/week_1/project/week_1.py b/week_1/project/week_1.py index 9e07be25..8c15e69c 100644 --- a/week_1/project/week_1.py +++ b/week_1/project/week_1.py @@ -75,4 +75,4 @@ def put_redis_data(agg_max: Aggregation) -> None: @job def week_1_pipeline(): - pass + put_redis_data(process_data(get_s3_data())) From 3dbe4e09bb96a02d4b06e719770df5956d05f748 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Wed, 24 Aug 2022 11:26:07 +0100 Subject: [PATCH 4/7] Add resources for S3 and Redis --- week_2/dagster_ucr/resources.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index adeb742f..b4829561 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -91,13 +91,34 @@ def mock_s3_resource(): return s3_mock -@resource -def s3_resource(): +@resource( + config_schema={ + "bucket": Field(String), + "access_key": Field(String), + "secret_key": Field(String), + "endpoint_url": Field(String), + }, + description="A resource that can connect to S3." +) +def s3_resource(context) -> S3: """This resource defines a S3 client""" - pass + return S3( + bucket=context.resource_config["bucket"], + access_key=context.resource_config["access_key"], + secret_key=context.resource_config["secret_key"], + endpoint_url=context.resource_config["endpoint_url"], + ) -@resource -def redis_resource(): +@resource( + config_schema={ + "host": Field(String), + "port": Field(Int), + } +) +def redis_resource(context) -> Redis: """This resource defines a Redis client""" - pass + return Redis( + host=context.resource_config["host"], + port=context.resource_config["port"], + ) From 439c1940ca61575be81547f0fe5a2695726b2a59 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Sun, 28 Aug 2022 19:11:54 +0100 Subject: [PATCH 5/7] Complete ops and graph --- week_2/dagster_ucr/project/week_2.py | 45 ++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index 3313403d..f59b1491 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -5,26 +5,47 @@ from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource -@op -def get_s3_data(): - pass +@op( + config_schema={"s3_key": str}, + required_resource_keys={"s3"}, + out={"stocks": Out(dagster_type=List[Stock])}, + tags={"kind": "s3"}, + description="Get a list of stocks from an S3 file", +) +def get_s3_data(context) -> List[Stock]: + stocks = context.resources.s3.get_data(context.op_config["s3_key"]) + return [Stock.from_list(stock) for stock in stocks] -@op -def process_data(): - # Use your op from week 1 - pass +@op( + ins={"stocks": In(dagster_type=List[Stock])}, + out={"agg_max": Out(dagster_type=Aggregation)}, + tags={"kind": "transformation"}, + description="Determine the stock with the highest value" +) +def process_data(stocks: List[Stock]) -> Aggregation: + # Find the stock with the highest value + max_stock = max(stocks, key=lambda x: x.high) + return Aggregation(date=max_stock.date, high=max_stock.high) -@op -def put_redis_data(): - pass +@op( + ins={"agg_max": In(dagster_type=Aggregation)}, + required_resource_keys={"redis"}, + out=Out(Nothing), + tags={"kind": "redis"}, + description="Write to Redis" +) +def put_redis_data(context, agg_max: Aggregation) -> None: + data = context.resources.redis.put_data(str(agg_max.date), str(agg_max.high)) + context.log.info(f"Write {data} to Redis.") @graph def week_2_pipeline(): - # Use your graph from week 1 - pass + s3_data = get_s3_data() + highest_stock = process_data(s3_data) + put_redis_data(highest_stock) local = { From 069a6043ed61a59eaa84b3c7056d9f4755d8abb7 Mon Sep 17 00:00:00 2001 From: Ian Young Date: Mon, 29 Aug 2022 13:47:11 +0100 Subject: [PATCH 6/7] Add description to Redis resource --- week_2/dagster_ucr/resources.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/week_2/dagster_ucr/resources.py b/week_2/dagster_ucr/resources.py index b4829561..e00f36e2 100644 --- a/week_2/dagster_ucr/resources.py +++ b/week_2/dagster_ucr/resources.py @@ -114,7 +114,8 @@ def s3_resource(context) -> S3: config_schema={ "host": Field(String), "port": Field(Int), - } + }, + description="A resource that connects to Redis.", ) def redis_resource(context) -> Redis: """This resource defines a Redis client""" From 3586228c1baf30c1ff54463c6ea8a6d85126e57b Mon Sep 17 00:00:00 2001 From: Ian Young Date: Mon, 29 Aug 2022 13:48:51 +0100 Subject: [PATCH 7/7] Use Dagster Type & Add logging example without context. --- week_2/dagster_ucr/project/week_2.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/week_2/dagster_ucr/project/week_2.py b/week_2/dagster_ucr/project/week_2.py index f59b1491..18c0665c 100644 --- a/week_2/dagster_ucr/project/week_2.py +++ b/week_2/dagster_ucr/project/week_2.py @@ -1,6 +1,6 @@ from typing import List -from dagster import In, Nothing, Out, ResourceDefinition, graph, op +from dagster import In, Nothing, Out, ResourceDefinition, graph, op, get_dagster_logger from dagster_ucr.project.types import Aggregation, Stock from dagster_ucr.resources import mock_s3_resource, redis_resource, s3_resource @@ -26,13 +26,16 @@ def get_s3_data(context) -> List[Stock]: def process_data(stocks: List[Stock]) -> Aggregation: # Find the stock with the highest value max_stock = max(stocks, key=lambda x: x.high) + # Log the output + logger = get_dagster_logger() + logger.info(f"Higest stock is: {max_stock}") return Aggregation(date=max_stock.date, high=max_stock.high) @op( ins={"agg_max": In(dagster_type=Aggregation)}, required_resource_keys={"redis"}, - out=Out(Nothing), + out=Out(dagster_type=Nothing), tags={"kind": "redis"}, description="Write to Redis" )