-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
An ETL pipeline with Amazon Redshift and AWS Glue example (#498)
- Loading branch information
Showing
7 changed files
with
352 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
### Scala an JVM | ||
*.class | ||
*.log | ||
.bsp | ||
.scala-build | ||
|
||
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml | ||
hs_err_pid* | ||
|
||
kubeconfig.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
import besom.* | ||
import besom.api.aws | ||
import besom.json.* | ||
|
||
@main def main = Pulumi.run { | ||
|
||
val clusterIdentifier = "my-redshift-cluster" | ||
val clusterDBName = "dev" | ||
val clusterDBUsername = "admin" | ||
val clusterDBPassword = "Password!123" | ||
val glueDBName = "my-glue-db" | ||
|
||
// Create an S3 bucket to store some raw data. | ||
val eventsBucket = aws.s3.Bucket( | ||
name = "events", | ||
aws.s3.BucketArgs(forceDestroy = true) | ||
) | ||
|
||
// Create a VPC. | ||
val vpc = aws.ec2.Vpc( | ||
name = "vpc", | ||
aws.ec2.VpcArgs( | ||
cidrBlock = "10.0.0.0/16", | ||
enableDnsHostnames = true | ||
) | ||
) | ||
|
||
// Create a private subnet within the VPC. | ||
val subnet = aws.ec2.Subnet( | ||
name = "subnet", | ||
aws.ec2.SubnetArgs( | ||
vpcId = vpc.id, | ||
cidrBlock = "10.0.1.0/24" | ||
) | ||
) | ||
|
||
// Declare a Redshift subnet group with the subnet ID. | ||
val subnetGroup = aws.redshift.SubnetGroup( | ||
name = "subnet-group", | ||
aws.redshift.SubnetGroupArgs( | ||
subnetIds = List(subnet.id) | ||
) | ||
) | ||
|
||
// Create an IAM role granting Redshift read-only access to S3. | ||
val redshiftRole = aws.iam.Role( | ||
name = "redshift-role", | ||
aws.iam.RoleArgs( | ||
assumeRolePolicy = json"""{ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Action": "sts:AssumeRole", | ||
"Effect": "Allow", | ||
"Principal": { | ||
"Service": "redshift.amazonaws.com" | ||
} | ||
} | ||
] | ||
}""".map(_.prettyPrint), | ||
managedPolicyArns = List( | ||
aws.iam.enums.ManagedPolicy.AmazonS3ReadOnlyAccess.value | ||
) | ||
) | ||
) | ||
|
||
// Create a VPC endpoint so the cluster can read from S3 over the private network. | ||
val vpcEndpoint = aws.ec2.VpcEndpoint( | ||
name = "s3-vpc-endpoint", | ||
aws.ec2.VpcEndpointArgs( | ||
vpcId = vpc.id, | ||
serviceName = p"com.amazonaws.${aws.getRegion(aws.GetRegionArgs()).name}.s3", | ||
routeTableIds = List(vpc.mainRouteTableId) | ||
) | ||
) | ||
|
||
// Create a single-node Redshift cluster in the VPC. | ||
val cluster = aws.redshift.Cluster( | ||
name = "cluster", | ||
aws.redshift.ClusterArgs( | ||
clusterIdentifier = clusterIdentifier, | ||
databaseName = clusterDBName, | ||
masterUsername = clusterDBUsername, | ||
masterPassword = clusterDBPassword, | ||
nodeType = "ra3.xlplus", | ||
clusterSubnetGroupName = subnetGroup.name, | ||
clusterType = "single-node", | ||
publiclyAccessible = false, | ||
skipFinalSnapshot = true, | ||
vpcSecurityGroupIds = List(vpc.defaultSecurityGroupId), | ||
iamRoles = List(redshiftRole.arn) | ||
) | ||
) | ||
|
||
// Define an AWS cron expression of "every 15 minutes". | ||
// https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchevents-expressions.html | ||
val every15minutes = "cron(0/15 * * * ? *)" | ||
|
||
// Create a Glue catalog database. | ||
val glueCatalogDB = aws.glue.CatalogDatabase( | ||
name = "glue-catalog-db", | ||
aws.glue.CatalogDatabaseArgs( | ||
name = glueDBName | ||
) | ||
) | ||
|
||
// Define an IAM role granting AWS Glue access to S3 and other Glue-required services. | ||
val glueRole = aws.iam.Role( | ||
name = "glue-role", | ||
aws.iam.RoleArgs( | ||
assumeRolePolicy = json"""{ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Action": "sts:AssumeRole", | ||
"Effect": "Allow", | ||
"Principal": { | ||
"Service": "glue.amazonaws.com" | ||
} | ||
} | ||
] | ||
}""".map(_.prettyPrint), | ||
managedPolicyArns = List( | ||
aws.iam.enums.ManagedPolicy.AmazonS3FullAccess.value, | ||
aws.iam.enums.ManagedPolicy.AWSGlueServiceRole.value | ||
) | ||
) | ||
) | ||
|
||
// Create a Glue crawler to process the contents of the data bucket on a schedule. | ||
// https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html | ||
val glueCrawler = aws.glue.Crawler( | ||
name = "glue-crawler", | ||
aws.glue.CrawlerArgs( | ||
databaseName = glueCatalogDB.name, | ||
role = glueRole.arn, | ||
schedule = every15minutes, | ||
s3Targets = List( | ||
aws.glue.inputs.CrawlerS3TargetArgs( | ||
path = p"s3://${eventsBucket.bucket}" | ||
) | ||
) | ||
) | ||
) | ||
|
||
// Create a Glue connection to the Redshift cluster. | ||
val glueRedshiftConnection = aws.glue.Connection( | ||
name = "glue-redshift-connection", | ||
aws.glue.ConnectionArgs( | ||
connectionType = "JDBC", | ||
connectionProperties = Map( | ||
"JDBC_CONNECTION_URL" -> p"jdbc:redshift://${cluster.endpoint}/${clusterDBName}", | ||
"USERNAME" -> clusterDBUsername, | ||
"PASSWORD" -> clusterDBPassword | ||
), | ||
physicalConnectionRequirements = aws.glue.inputs.ConnectionPhysicalConnectionRequirementsArgs( | ||
securityGroupIdLists = cluster.vpcSecurityGroupIds, | ||
availabilityZone = subnet.availabilityZone, | ||
subnetId = subnet.id | ||
) | ||
) | ||
) | ||
|
||
// Create an S3 bucket for Glue scripts and temporary storage. | ||
val glueJobBucket = aws.s3.Bucket( | ||
name = "glue-job-bucket", | ||
aws.s3.BucketArgs( | ||
forceDestroy = true | ||
) | ||
) | ||
|
||
// Upload a Glue job script. | ||
val glueJobScript = aws.s3.BucketObject( | ||
name = "glue-job.py", | ||
aws.s3.BucketObjectArgs( | ||
bucket = glueJobBucket.id, | ||
source = Asset.FileAsset("./glue-job.py") | ||
) | ||
) | ||
|
||
// Create a Glue job that runs our Python ETL script. | ||
val glueJob = aws.glue.Job( | ||
name = "glue-job", | ||
aws.glue.JobArgs( | ||
roleArn = glueRole.arn, | ||
glueVersion = "3.0", | ||
numberOfWorkers = 10, | ||
workerType = "G.1X", | ||
defaultArguments = Map( | ||
// Enabling job bookmarks helps you avoid loading duplicate data. | ||
// https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html | ||
"--job-bookmark-option" -> "job-bookmark-enable", | ||
"--ConnectionName" -> glueRedshiftConnection.name, | ||
"--GlueDBName" -> glueDBName, | ||
"--GlueDBTableName" -> eventsBucket.bucket.map(_.replace("-", "_")), | ||
"--RedshiftDBName" -> clusterDBName, | ||
"--RedshiftDBTableName" -> "events", | ||
"--RedshiftRoleARN" -> redshiftRole.arn, | ||
"--TempDir" -> p"s3://${glueJobBucket.bucket}/glue-job-temp" | ||
), | ||
connections = List(glueRedshiftConnection.name), | ||
command = aws.glue.inputs.JobCommandArgs( | ||
scriptLocation = p"s3://${glueJobBucket.bucket}/glue-job.py", | ||
pythonVersion = "3" | ||
) | ||
) | ||
) | ||
|
||
// Create a Glue trigger to run the job every 15 minutes. | ||
val glueJobTrigger = aws.glue.Trigger( | ||
name = "trigger", | ||
aws.glue.TriggerArgs( | ||
schedule = every15minutes, | ||
`type` = "SCHEDULED", | ||
actions = List( | ||
aws.glue.inputs.TriggerActionArgs(jobName = glueJob.name) | ||
) | ||
) | ||
) | ||
|
||
Stack(vpcEndpoint, glueCrawler, glueJobScript, glueJobTrigger).exports( | ||
dataBucketName = eventsBucket.bucket | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
name: aws-redshift-glue-etl | ||
description: An ETL pipeline with Amazon Redshift and AWS Glue | ||
runtime: scala |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# ETL pipeline with Amazon Redshift and AWS Glue | ||
|
||
This example creates an ETL pipeline using Amazon Redshift and AWS Glue. The pipeline extracts data from an S3 bucket | ||
with a Glue crawler, transforms it with a Python script wrapped in a Glue job, and loads it into a Redshift database | ||
deployed in a VPC. | ||
|
||
## Prerequisites | ||
|
||
[Follow the instructions](https://www.pulumi.com/docs/clouds/aws/get-started/begin/) | ||
to get started with Pulumi & AWS. | ||
|
||
## Deploying | ||
|
||
1. Create a new stack, which is an isolated deployment target for this example: | ||
|
||
```bash | ||
pulumi stack init dev | ||
``` | ||
|
||
2. Set the AWS region: | ||
|
||
```bash | ||
pulumi config set aws:region us-west-2 | ||
``` | ||
|
||
3. Stand up the cluster: | ||
|
||
```bash | ||
pulumi up | ||
``` | ||
4. In a few moments, the Redshift cluster and Glue components will be up and running and the S3 bucket name emitted as a | ||
Pulumi stack output: | ||
|
||
5. Upload the included sample data file to S3 to verify the automation works as expected: | ||
|
||
```bash | ||
aws s3 cp events-1.txt s3://$(pulumi stack output dataBucketName) | ||
``` | ||
|
||
6. When you're ready, destroy your stack and remove it: | ||
```bash | ||
pulumi destroy --yes | ||
pulumi stack rm --yes | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{"id": 1, "name": "An interesting event"} | ||
{"id": 2, "name": "Another interesting event"} | ||
{"id": 3, "name": "An event of monumental importance"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import sys | ||
from awsglue.utils import getResolvedOptions | ||
from awsglue.transforms import ApplyMapping | ||
from awsglue.context import GlueContext | ||
from awsglue.job import Job | ||
from pyspark.context import SparkContext | ||
|
||
# Collect the arguments passed in by the glue.Job run. | ||
args = getResolvedOptions( | ||
sys.argv, | ||
[ | ||
"JOB_NAME", | ||
"TempDir", | ||
"ConnectionName", | ||
"GlueDBName", | ||
"GlueDBTableName", | ||
"RedshiftRoleARN", | ||
"RedshiftDBName", | ||
"RedshiftDBTableName", | ||
], | ||
) | ||
|
||
glueContext = GlueContext(SparkContext.getOrCreate()) | ||
|
||
job = Job(glueContext) | ||
job.init(args["JOB_NAME"], args) | ||
|
||
# Extract all unprocessed data from the Glue catalog. | ||
source0 = glueContext.create_dynamic_frame.from_catalog( | ||
database=args["GlueDBName"], | ||
table_name=args["GlueDBTableName"], | ||
additional_options={ | ||
"jobBookmarkKeys": ["id"], | ||
"jobBookmarkKeysSortOrder": "asc", | ||
}, | ||
transformation_ctx="source0", | ||
) | ||
|
||
# Transform the data (mostly just to show how to do so). | ||
transformed0 = ApplyMapping.apply( | ||
frame=source0, | ||
mappings=[ | ||
("id", "int", "event_id", "int"), | ||
("name", "string", "event_name", "string"), | ||
], | ||
) | ||
|
||
# Load the data into the Redshift database. | ||
glueContext.write_dynamic_frame.from_jdbc_conf( | ||
frame=transformed0, | ||
catalog_connection=args["ConnectionName"], | ||
connection_options={ | ||
"database": args["RedshiftDBName"], | ||
"dbtable": args["RedshiftDBTableName"], | ||
"aws_iam_role": args["RedshiftRoleARN"], | ||
}, | ||
redshift_tmp_dir=args["TempDir"], | ||
) | ||
|
||
# Call commit() to reset the job bookmark for the next run. | ||
job.commit() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
//> using scala "3.3.1" | ||
//> using options -Werror -Wunused:all -Wvalue-discard -Wnonunit-statement | ||
//> using dep "org.virtuslab::besom-core:0.4.0-SNAPSHOT" | ||
//> using dep "org.virtuslab::besom-aws:6.32.0-core.0.4-SNAPSHOT" | ||
|
||
//> using repository sonatype:snapshots |