Course Content
Apache Airflow Installation on Linux / Ubuntu / CentOS
0/1
Mastering Apache Airflow: Building and Managing Dynamic, Scalable Data Pipelines with DAGs
0/1
Apache Airflow
About Lesson

Introduction 

Apache Airflow is an open-source platform that allows users to programmatically create, schedule, and manage workflows. Its primary feature is the Directed Acyclic Graph (DAG), which represents a series of tasks executed in a specific order to complete a data pipeline. This detailed blog post delves into the inner workings of Apache Airflow, focusing on creating and managing DAGs, best practices, and advanced use cases.

Understanding Apache Airflow 


Apache Airflow is a highly customizable, extensible, and scalable platform that helps data engineers define, schedule, and monitor complex data pipelines. It is designed to handle dependencies between tasks and enables data processing jobs to run in parallel or sequentially, based on the defined workflow.

Directed Acyclic Graphs (DAGs) in Airflow 



A Directed Acyclic Graph (DAG) is a collection of tasks connected by directed edges, where each task represents a specific operation within the data pipeline. DAGs in Airflow have the following properties:

  • Directed : Edges have a direction, meaning tasks have a specific order of execution.
  • Acyclic : There are no cycles in the graph, ensuring tasks are executed only once.
  • Graph : A mathematical structure representing a set of tasks and their relationships.

Creating a DAG in Apache Airflow 


To create a DAG in Apache Airflow, follow these steps:

a. Import required libraries and modules:

from datetime import datetime, timedelta 
from airflow import DAG 
from airflow.operators.dummy import DummyOperator 
from airflow.operators.python import PythonOperator 

b. Define default arguments for the DAG:

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2022, 1, 1), 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
} 

c. Instantiate a DAG object:

dag = DAG( 
    'my_first_dag', 
    default_args=default_args, 
    description='A simple DAG example', 
    schedule_interval=timedelta(days=1), 
    catchup=False, 
) 

d. Define tasks within the DAG:

def print_hello(): 
    print("Hello from task_1!") 
    start_task = DummyOperator(task_id='start', dag=dag) 
    task_1 = PythonOperator(task_id='task_1', python_callable=print_hello, dag=dag) 
    end_task = DummyOperator(task_id='end', dag=dag) 

e. Set up task dependencies:

start_task >> task_1 >> end_task 

Complete Example of DAG 


In this complete example, we’ll create an Airflow DAG that reads data from a CSV file, processes the data, and stores the results in a new CSV file. We will use PythonOperator to perform the tasks.

  1. Requirements

  • Apache Airflow installed and configured
  • A sample CSV file named input_data.csv containing the following data:
Name,Age John,30 Alice,25 Bob,22 
  1. Creating the DAG

Create a new Python file named csv_processing_dag.py in your Airflow DAGs folder and add the following code:

import os 
import pandas as pd 
from datetime import datetime, timedelta 
from airflow import DAG 
from airflow.operators.python import PythonOperator 

# Define the default arguments for the DAG 
default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime(2022, 1, 1), 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=5), 
} 

# Instantiate the DAG object 

dag = DAG( 
    'csv_processing_dag', 
    default_args=default_args, 
    description='A simple CSV processing example', 
    schedule_interval=timedelta(days=1), 
    catchup=False, 
) 

# Define the read_csv task 
def read_csv(**kwargs): 
    input_file = 'input_data.csv' 
    df = pd.read_csv(input_file) 
    kwargs['ti'].xcom_push(key='data', value=df) 

read_csv_task = PythonOperator( 
    task_id='read_csv', 
    python_callable=read_csv, 
    provide_context=True, dag=dag, 
) 

# Define the process_data task 
def process_data(**kwargs): 
    df = kwargs['ti'].xcom_pull(key='data', task_ids='read_csv') 
    df['Age'] = df['Age'] * 2 
    kwargs['ti'].xcom_push(key='processed_data', value=df) 
    
process_data_task = PythonOperator( 
    task_id='process_data', 
    python_callable=process_data, 
    provide_context=True, dag=dag, ) 
    
# Define the write_csv task 
def write_csv(**kwargs): 
    output_file = 'output_data.csv' 
    df = kwargs['ti'].xcom_pull(key='processed_data', task_ids='process_data') 
    df.to_csv(output_file, index=False) 
    
write_csv_task = PythonOperator( 
    task_id='write_csv', 
    python_callable=write_csv, 
    provide_context=True, dag=dag, 
) 

# Set up task dependencies 
read_csv_task >> process_data_task >> write_csv_task 
  1. Executing the DAG

Once you’ve created the DAG, Airflow will automatically discover it. Go to the Airflow web UI, find the csv_processing_dag , and turn it on. The DAG will start executing based on the defined schedule.

Alternatively, you can trigger the DAG manually using the Airflow CLI:

airflow dags trigger csv_processing_dag 

After the DAG completes, you’ll see a new CSV file named output_data.csv containing the processed data:

Name,Age John,60 Alice,50 Bob,44 

This example demonstrates how to create a simple data processing pipeline using Apache Airflow. You can further extend this example by incorporating additional data sources, processing steps, or output formats, as required by your specific use case.

Managing DAGs in Apache Airflow 


To manage DAGs in Apache Airflow, place the DAG Python script in the dags folder in the Airflow home directory. Airflow will automatically discover and import the DAG. Use the Airflow web UI or CLI commands to manage, monitor, and troubleshoot DAG runs.

Best Practices for Designing DAGs 


  • Modularize and reuse code: Break down complex workflows into smaller, reusable components.
  • Idempotency : Ensure that tasks produce the same results regardless of the number of times they are executed.
  • Task granularity : Keep tasks small and focused on a single operation to enable parallelism and improve fault tolerance.
  • Use dynamic task generation : Generate tasks programmatically based on input data to create flexible and scalable workflows.
  • Monitor and log: Make use of Airflow’s built-in logging and monitoring capabilities to identify issues and optimize performance.

Advanced Use Cases and Features 


a. SubDAGs: Break down complex workflows into smaller, more manageable sub-workflows by using SubDAGs. SubDAGs are essentially DAGs within a parent DAG, which allows for better organization and modularization of workflows.

b. Branching: Use branching to conditionally execute tasks based on the output of previous tasks. The BranchPythonOperator allows you to define a function that returns the next task to execute based on custom logic.

c. Trigger rules: Control the execution of tasks based on the state of their upstream tasks. By default, a task will execute once all its upstream tasks have succeeded, but you can change this behavior by setting custom trigger rules.

d. XComs: Share data between tasks using Airflow’s XCom (cross-communication) feature. XComs allow tasks to exchange small amounts of data in the form of key-value pairs.

e. SLAs: Define Service Level Agreements (SLAs) for tasks or the entire DAG to set expectations for task completion times. Airflow can alert you when SLAs are not met, allowing you to take corrective action.

f. Connection management: Manage external connections and credentials in a centralized manner using Airflow’s built-in connection management system.

Monitoring and Troubleshooting DAGs 


Airflow provides several tools for monitoring and troubleshooting DAG runs, including:

  • Web UI: A user-friendly interface that provides an overview of DAGs, task runs, and logs.
  • CLI: Airflow’s command-line interface allows you to manage, monitor, and troubleshoot DAGs using terminal commands.
  • Logging: Airflow automatically logs task execution information, which can be helpful for debugging and identifying performance issues.
  • Metrics: Airflow can be integrated with monitoring systems like Prometheus or StatsD to gather and visualize metrics related to task execution and resource usage.