AWS Glue is a fully managed, cloud-native, AWS service for performing extract, transform and load operations across a wide range of data sources and destinations.
It supports connectivity to Amazon Redshift, RDS and S3, as well as to a variety of third-party database engines running on EC2 instances. It’s a serverless infrastructure, thus avoiding the need for users to provision and maintain underlying compute and networking resources. It runs on an Apache Spark environment and so is designed to efficiently handle very large workloads.
It consists of 3 components, a data catalogue, a job authoring environment and a job execution environment. The data catalogue can be used independently of the others, since it can serve as a meta data repository for Amazon Athena, Amazon EMR and Amazon Redshift Spectrum. Job authoring involves modifying or writing scripts in Python or Scala, where Glue implements an extension of the PySpark Python dialect that includes a construct called a DynamicFrame.
DynamicFrames are an extension of Spark DataFrames and allow for different DynamicRecords (i.e. self-describing rows) in the DynamicFrame to have different schema. This is in contrast to DataFrames, where all rows must have the same schema. DynamicFrames better support the ‘messiness’ that can be found in data sets that are the subject of ETLs.
Here, we’re using Glue to load data from CSV files stored in S3 to an Amazon Redshift database, during which a number of DynamicFrames transformations are applied.
Firstly, an IAM service role is required for Glue execution, e.g. for testing connections, running crawlers and executing ETL jobs. For this example, I’ve set up a new role with 2 policies: a Glue managed service policy that gives Glue access to resources that it needs to operate (e.g. S3 for storing scripts), and a policy for accessing the S3 bucket where our source data is stored and that we write error rows to.
Secondly, in the case where Glue needs to access data stores in a VPC, a VPC endpoint is required for S3. Glue sets up elastic network interfaces to enable jobs to connect securely to resources within a VPC, where each elastic network interface is assigned a private IP address from the IP address range within the relevant subnet. Use of an S3 VPC endpoint allows Glue to use private IP addresses to access Amazon S3, meaning that traffic between the VPC and S3 stays private and does not leave the Amazon network (i.e. it does not use the public internet).
Thirdly, and again in the case where Glue needs to access data stores in a VPC, a self-referencing rule is required in the security group assigned to each data store (e.g. Amazon Redshift or RDS). This rule needs to allow access over all TCP ports (inbound and outbound) with the security group itself as a source. This allows for traffic between Glue components and the data store in the VPC.
The scenario I’m modelling here is that of incrementally loading new rows into the products dimension table of a data warehouse running on Redshift. The source consists of one CSV file with products rows, and another CSV file with category rows, where there’s a foreign key/primary key relationship between the products data and the category data over the category column. My initial files for loading are as follows:
The Glue ETL will join this data so as to denormalise the schema, as is typical for data warehouse star schema dimension tables.
The 2 CSV files are stored in different directory paths within the same S3 bucket, which allow for separate schema to be created for them in the Glue catalogue. I created the catalogue schema for them manually since the Glue crawler was wanting to combine them into a single schema. The table schema are as follows:
I added the skip.header.line.count = 1 table property to both tables, since the CSV files both have headers.
The ETL will allow for the initial load of this data, and also for incremental loads of new files (i.e. with new rows) that are added to the S3 bucket.
The ETL script is written using Python 3 and makes use of the Glue extensions to PySpark. It’s in my GitHub repository here and is described in the sections below.
Basic setup is completed first, including defining the ETL source and destination information:
(2) Load the CSV files
Both CSV files are loaded via their catalogue entries. Only one uses the transformation_ctx parameter – this is discussed in the bookmarks section below.
(3) Perform the transformations
A number of transformations are performed here:
– rows with “deleted = y” are deleted
– the “deleted” column is dropped
– the date string is converted into a date type, where any resulting null dates are written to an error frame
– the 2 frames of CSV data are joined, first renaming columns to avoid duplicate column names after the join
– the category key column present in both original frames is deleted
(Some of the transformations below could have been chained together, but are kept separate for readability.)
(4) Write to Redshift and S3
The joined data is written to the Products table in Redshift and any error rows from the date conversion are written to a file in S3:
The Glue job for the above script runs using the Spark 2.4/Python 3 runtime and has job bookmarks enabled, continuous logging enabled, a maximum capacity of 2 DPUs (to minimise cost) and a maximum concurrency of 1 (advised when using bookmarks).
The initial run of the job results in the following table in Redshift:
So the denormalised category data can be seen here, along with the original string dates converted to a date type and with the incorrectly formatted date from the source data nulled out. Additionally an error row regarding the null date was written to an S3 file as per the script above.
Bookmarks are enabled for the job, to allow for newly arriving product and category data to be processed, such that previously loaded product data is not reprocessed. However we only want to ignore previously existing product files for successive runs of the job – all category files (initial and incremental) must always be loaded because any of these could form the right hand side of the join for new products. To achieve this, the transformation_ctx parameter was only passed into the loading and transformation of the products file in the script, not the categories file.
To prove that this worked, the following new products file was added to the same S3 path as the existing products file:
When the job was run again, both the logging of CSV row counts and the resulting table in Redshift showed that the previous products file was ignored, this new products file was read, and the previous categories file was also read (i.e. to join with and denormalise the above category column).