Last updated on November 30, 2023
In the vast realm of data processing, orchestrating workflows becomes crucial to ensure tasks run efficiently and reliably. Apache Airflow has revolutionized this aspect, providing a flexible platform to define, schedule, and monitor workflows. Combining this with Amazon’s managed service, we can supercharge our workflow setup without the overhead of manual maintenance. In this guide, we dive deep into constructing your first Directed Acyclic Graph (DAG) using Apache Airflow, elucidating each component and step involved.
Directed Acyclic Graph (DAG)
At its core, a DAG is a collection of vertices and edges, where each edge has a direction, and there are no cycles. This means you can’t traverse the graph and come back to where you started. In the context of Airflow, each vertex or node is a task, and the edges define the order in which tasks run. The acyclic nature ensures there are no infinite loops in execution, making it an ideal structure for workflows where tasks have clear dependencies.
Our First DAG
Configuration:
a. default_args:
Every DAG in Airflow supports a default_args parameter. This is a dictionary of default parameters that your tasks will inherit unless explicitly overridden. Common configurations include:
- owner: Specifies who owns this DAG. It’s more of a tag, and doesn’t control permissions or anything operational.
- depends_on_past: When set to True, the current task instance will depend on the success of the previous one. This is handy if tasks in the current run depend on data from the previous runs.
- start_date: The date at which your tasks will start running. This isn’t the date the DAG is created or started; it’s more of a logical date.
- email_on_failure and email_on_retry: Controls email notifications upon task failures or retries.
- retries: If a task fails, how many times should it be retried.
In our example:
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 10, 30), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, }
Extract-Transform-Load (ETL) Code
For this article, I created a basic ETL code that will serve as the main logic behind our DAG:
a. Tasks and Operators:
Airflow tasks are instantiated through operators. Operators determine what gets done by a task. There’s a wide variety of operators available in Airflow, from running SQL queries to executing Python functions, as we see in our example using the PythonOperator.
task_extract = PythonOperator( task_id='extract', python_callable=extract, provide_context=True, dag=dag ) task_transform = PythonOperator( task_id='transform', python_callable=transform, provide_context=True, dag=dag ) task_load = PythonOperator( task_id='load', python_callable=load, provide_context=True, dag=dag )
b. Extract-Transform-Load (ETL):
The code reflects a simple ETL process:
- extract: We extract data from a CSV URL and push it to XCom (a means of communication between tasks in Airflow).
- transform: We pull the data from XCom, make some transformations like renaming columns, and push the transformed data back to XCom.
- load: We pull the transformed data and load it into an S3 bucket.
Code example:
def extract(**kwargs): logging.info('Starting extract function') url = 'https://people.sc.fsu.edu/~jburkardt/data/csv/airtravel.csv' df = pd.read_csv(url) kwargs['ti'].xcom_push(key='extracted_data', value=df) logging.info('Extract function completed') logging.info(df.head(5)) def transform(**kwargs): logging.info('Starting transform function') data = kwargs['ti'].xcom_pull(key='extracted_data') logging.info(data.head(5)) # Remove "" in column names data.columns = data.columns.str.replace('"', '') # Remove whitespace in column names data.columns = data.columns.str.strip() data['Total'] = data['1958'] + data['1959'] + data['1960'] kwargs['ti'].xcom_push(key='transformed_data', value=data) logging.info('Transform function completed') def load(**kwargs): logging.info('Starting load function') data = kwargs['ti'].xcom_pull(key='transformed_data') s3 = boto3.resource('s3') bucket = 'my-airflow-dags-bucket' csv_buffer = StringIO() data.to_csv(csv_buffer) filename = f"airtravel/transformed/{datetime.now().strftime('%Y-%m-%d')}.csv" s3.Object(bucket, filename).put(Body=csv_buffer.getvalue()) logging.info('Load function completed, data loaded to s3')
c. Task Dependencies:
One of Airflow’s core features is determining the order in which tasks run, denoted by their dependencies. Using the >> and << operators, we can define the sequence in which tasks are executed. In our example, the extract task will run first, followed by transform, and then load:
task_extract >> task_transform >> task_load
With this foundation, you’re well on your way to crafting more intricate DAGs tailored to complex workflows. The strength of Airflow lies in its flexibility, and with each new DAG, you’ll uncover more of its potential. Stay tuned for our next section, where we delve into uploading this DAG to Amazon S3.
Uploading the DAG and Requirements to S3
Amazon Simple Storage Service (S3) is the heart and soul of many AWS services, and Amazon Managed Workflows for Apache Airflow (MWAA) is no exception. In the context of MWAA, S3 serves as the repository where your DAGs and any supplementary files (e.g., Python libraries in a requirements.txt file) reside. By leveraging the durability and scalability of S3, MWAA ensures that your workflows have access to the necessary code and libraries, making the deployment and scaling processes smoother.
Creating an S3 Bucket for Your DAGs
Before uploading anything, you’ll need an S3 bucket. It’s recommended to create a dedicated bucket for your Airflow needs to maintain organization and ensure the correct permissions.
Uploading Your DAG to S3
Once your S3 bucket is set up, it’s time to upload the DAG. Navigate to the newly created bucket, create a directory (often named dags), and upload your .py file containing the DAG code.
Uploading Additional Requirements to S3
If your DAG uses external Python libraries not included in the base MWAA environment, you’ll need a requirements.txt file. This file lists all the Python packages your DAG depends on. Just as with the DAG, upload this file to your S3 bucket, typically in a directory named requirements or at the root.
By ensuring that your DAGs and any necessary dependencies are securely stored in S3, you’re setting the stage for a seamless integration with MWAA. In the next sections, we’ll walk through creating your first Airflow environment and how to associate it with this S3 bucket.
Creating your First Airflow Environment
The Amazon Managed Workflows for Apache Airflow (MWAA) service makes it easier than ever to deploy an Airflow environment on AWS. Once your DAGs and their dependencies are safely stored in S3, the next step is to establish an MWAA environment. This environment is essentially an encapsulated Airflow instance managed by AWS, freeing you from the intricacies of installation, scaling, and maintenance.
Navigating to the Amazon MWAA Console
Your journey begins at the AWS Management Console. Here, under the Application Integration section, you’ll find the Amazon MWAA service. Clicking on it takes you to the MWAA dashboard.
Initiating the “Create Environment” Process
Once inside the MWAA dashboard, you’ll spot a “Create environment” button. Clicking this propels you into the environment creation wizard.
Basic configuration
In the environment details, you can specify the Name, Airflow Version, and the Weekly Maintenance Window Start.
Specifying the S3 Path for DAGs
The wizard will prompt you to specify the S3 path where your DAGs are located. This is crucial as it lets MWAA know where to fetch your workflows from.
Including the requirements.txt
If you’ve uploaded a requirements.txt file, you’ll need to provide its path as well. This ensures your environment is equipped with the necessary Python packages.
Configuring Additional Settings
There are several other settings that you can optionally configure:
- Airflow version: MWAA supports multiple versions of Airflow, so ensure you choose the one best suited for your DAGs.
- Other optional configurations might include custom plugins, environment class (size), and networking settings.
Networking Configuration
Virtual Private Cloud (VPC)
Define the networking infrastructure setup for your Airflow environment. Ensure you select an appropriate VPC where your Airflow will reside. Additionally, you can click the Create MWAA VPC, and from there, it will let you use a CloudFromation Stack to easily configure a VPC fit to your MWAA needs.
Note: An environment needs 2 private subnets in different availability zones.
Web Server Access
Decide if you want your Airflow UI to be accessed within a private network (no internet access) or from a public network. For the simplicity of this article, I will simply select a public network.
Note: For private network access, the Airflow web server is reached via a VPC endpoint. Ensure you’ve set up the endpoint correctly.
Security Group
This defines the allowed traffic between your environment and your web server.
Environment Class
Choose the capacity and CPU settings appropriate for your use case. This includes:
- DAG capacity
- Scheduler CPU
- Worker CPU
- Web server CPU
Set the number of workers your environment will need:
- Maximum worker count
- Minimum worker count
Choose the number of schedulers for your environment:
Encryption and Monitoring
Encryption
Amazon MWAA offers data encryption using either AWS owned or customer provided KMS keys.
Monitoring and Airflow Logging Configuration
Enable CloudWatch Metrics to monitor environment performance metrics. Customize which Airflow tasks and events are logged.
Permissions
Execution Role
This is crucial as it defines the IAM role used by your environment to access your DAG code, write logs, and perform other related actions.
Note: The significance of the execution role is important. This role needs the correct permissions to interact with the specified S3 bucket where your DAGs reside. Ensure you grant the required permissions to this role to avoid any access issues. You might also need to add other permissions, such as putObject, for tasks that require you to upload data into a bucket.
Once all sections are correctly configured, proceed to review and create your environment. Ensure all settings align with your intended Airflow setup and operational needs. After environment creation, you can monitor its status and manage configurations directly from the Amazon MWAA console. Note that the creation of the environment might take 20 to 30 minutes.
Airflow UI
The Airflow UI provides an intuitive interface for monitoring and managing your Directed Acyclic Graphs (DAGs), which are collections of tasks with defined dependencies.
To access the Airflow UI:
- Navigate to the ‘Airflow environments’ dashboard.
- Locate the environment you wish to view. In this example, it’s MyFirstAirflowEnvironment.
- On the right side of the environment’s row, you’ll see an option labeled “Open Airflow UI”. Click on it.
This action will launch the Airflow UI in a new browser window or tab.
Overview of the Airflow UI
Once you’ve entered the Airflow UI:
- The homepage displays a list of all your DAGs. Each DAG’s name, owner, and schedule can be viewed at a glance.
- In our example, you should see a DAG named my_first_dag.
- Above the DAG list, there’s a menu bar with options like ‘DAGs’, ‘Datasets’, ‘Security’, ‘Browse’, ‘Admin’, and ‘Docs’. This allows for easy navigation to different sections of the UI.
- To the right of the DAG list, you can see the status of recent tasks, the last time each DAG ran, and when it’s scheduled to run next. There’s also an option to refresh the DAGs list and search for specific DAGs.
Manually Running the DAG
Sometimes, you might want to manually trigger a DAG run, outside of its regular schedule, but for our purposes, we just want to check if the DAG works:
- Locate my_first_dag in the list.
- On the right side of its row, under the “Actions” column, click on the “play” icon (▶️). This will initiate a manual run of the DAG.
- A confirmation popup will appear. Click “OK” to proceed.
Visual Checking of the DAG
To visualize the flow and dependencies of your DAG:
- Click on the name my_first_dag in the DAG list.
- You’ll be taken to a detailed page for that DAG. Here, you can see various tabs like ‘Details’, ‘Graph’, ‘Tree View’, etc.
- Click on the ‘Grid’ tab.
- This displays a visual representation of your DAG. Each rectangle represents a task, and the arrows between them show the flow and dependencies.
In our example, you’ll see three tasks named extract, transform, and load. The arrows indicate the order in which these tasks run.
Observing Logs in Amazon CloudWatch
Amazon CloudWatch captures the logs from Airflow tasks, providing an external platform for monitoring and troubleshooting beyond the Airflow UI. In this setup, each individual task in an Airflow DAG generates its own set of logs, allowing for granular insights and pinpointed debugging for specific task executions.
Here is an image of the logs of the load task:
Final Remarks
In today’s digital age, where data drives decision-making, the significance of efficient workflow orchestration cannot be overstated. It forms the backbone of many data processing tasks, ensuring that complex workflows are executed seamlessly and efficiently.
Choosing Amazon MWAA for Apache Airflow is a strategic decision that offers numerous benefits. Not only does it provide a managed environment for running Airflow, but it also integrates natively with other AWS services, enhancing scalability, reliability, and ease of use.
For those who are just starting or are familiar with Airflow, we encourage you to dive deeper. Explore more intricate DAGs, experiment with advanced features, and harness the full potential of MWAA. The platform is versatile and offers a plethora of tools and configurations that can cater to both simple and complex use cases.
Lastly, while AWS provides a robust platform with a wide range of services, it’s imperative to keep an eye on associated costs. Always monitor your usage, set up alerts for unexpected spikes, and remember to clean up unused resources. This ensures not just efficient workflows but also cost-effective operations.
P.S. A heartfelt thank you to everyone who has been following along on this journey through Amazon MWAA and Apache Airflow. We hope it has been enlightening and valuable. As a friendly reminder, please don’t forget to delete your Airflow Environment if you’re not using it, to avoid incurring unnecessary costs. It’s always a good practice to keep your AWS environment clean and efficient. Thanks again and happy data orchestrating!
Resources:
https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html