Table of Contents

Setup & Environment

The fastest path to a working Airflow environment is the official Docker Compose setup. It gives you a production-like stack (PostgreSQL metadata DB, Redis broker, Celery workers, scheduler, webserver) without any local Python dependency wrestling.

Docker Compose (Recommended)

# 1. Download the official docker-compose.yaml for Airflow 2.10.4
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.4/docker-compose.yaml'

# 2. Create required directories
mkdir -p ./dags ./logs ./plugins ./config

# 3. Set your host user ID so containers write files you can edit
echo -e "AIRFLOW_UID=$(id -u)" > .env

# 4. Initialize the metadata database and create the admin user
docker compose up airflow-init

# 5. Start all services in the background
docker compose up -d

# 6. Watch scheduler and webserver come up
docker compose logs -f webserver scheduler

Access the UI at http://localhost:8080 — username airflow, password airflow.

What the Docker stack includes
The Compose file starts: webserver (UI + REST API), scheduler (parses DAGs, submits tasks), worker (executes tasks via Celery), triggerer (deferred operators), postgres (metadata DB), and redis (Celery broker). It mirrors a real production deployment.

Local pip Install (for Development/Testing)

# Python 3.9–3.12 supported
python -m venv airflow-venv
source airflow-venv/bin/activate

# Airflow uses constraint files to avoid dependency conflicts
AIRFLOW_VERSION=2.10.4
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Initialize SQLite metadata DB and create admin user
airflow db migrate
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected] \
  --password admin

# Start webserver and scheduler in separate terminals
airflow webserver --port 8080
airflow scheduler
SQLite is for development only
The local pip install defaults to SQLite, which does not support parallel task execution. Use LocalExecutor with PostgreSQL for anything beyond single-task DAGs. The Docker setup is strongly preferred for realistic testing.

Verification Checklist

Core Concepts

Airflow is a workflow orchestrator, not a data processor. It schedules and monitors pipelines defined as Python code — it does not move or transform data itself. Think of it as the traffic controller that decides when and where work runs, and tracks what happened.

Architecture

ComponentRole
DAGDirected Acyclic Graph — a Python file defining tasks and their dependencies
TaskA unit of work, defined by an Operator instance
OperatorTemplate for a task type (BashOperator, PythonOperator, etc.)
DAG RunA single execution instance of a DAG for a specific logical date
Task InstanceA single execution of a Task within a DAG Run
SchedulerParses DAG files, decides when DAG Runs should be created, submits tasks to the executor
ExecutorMechanism for running tasks (Local, Celery, Kubernetes)
Metadata DBPostgreSQL/MySQL database storing all DAG, run, and task state
WorkerProcess or pod that actually executes a task instance
WebserverFlask app serving the UI and REST API

The "Define in Python, Execute Anywhere" Philosophy

DAG files are pure Python code — no YAML configs, no XML. This means you get loops, conditionals, imports, and tests for free. The scheduler imports your DAG files repeatedly (every ~30 seconds by default) to detect changes, so DAG-level code must be fast and side-effect free. Heavy computation belongs inside task functions, not at module scope.

DAG files are imported frequently
The scheduler imports each DAG file on every parse cycle. Avoid network calls, database queries, or slow imports at the top level of a DAG file. These slow the scheduler and can cause parse errors that disable your entire DAG.

Task Instance Lifecycle

Task instances transition through these states:

scheduledqueuedrunningsuccess / failed / skipped / up_for_retry

DAGs

A DAG file is any Python file in the dags/ folder that contains a DAG object. There are two styles: the context manager (classic) and the @dag decorator (TaskFlow API, Airflow 2.0+).

Context Manager Style (Classic)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# default_args apply to all tasks unless overridden at task level
default_args = {
    "owner": "data-team",
    "depends_on_past": False,        # don't wait for previous run's same task
    "email": ["[email protected]"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,  # 5min, 10min, 20min...
    "execution_timeout": timedelta(hours=1),
}

with DAG(
    dag_id="daily_etl_pipeline",
    description="Extract, transform, load daily sales data",
    schedule="0 6 * * *",           # cron: daily at 06:00 UTC
    start_date=datetime(2024, 1, 1),
    catchup=False,                   # don't backfill missed runs
    default_args=default_args,
    tags=["etl", "sales", "daily"],
    max_active_runs=1,               # only one concurrent DAG run
    doc_md="""
    ## Daily ETL Pipeline
    Extracts sales data from the API, transforms it, and loads to the warehouse.
    Runs daily at 06:00 UTC.
    """,
) as dag:

    extract = BashOperator(
        task_id="extract_sales",
        bash_command="python /opt/scripts/extract.py --date {{ ds }}",
    )

    def transform_data(**context):
        execution_date = context["ds"]
        print(f"Transforming data for {execution_date}")
        # ... transformation logic

    transform = PythonOperator(
        task_id="transform_sales",
        python_callable=transform_data,
    )

    load = BashOperator(
        task_id="load_to_warehouse",
        bash_command="python /opt/scripts/load.py --date {{ ds }}",
    )

    extract >> transform >> load

Decorator Style (@dag)

from datetime import datetime, timedelta
from airflow.decorators import dag, task

@dag(
    dag_id="weekly_report",
    schedule="@weekly",             # shorthand: @hourly, @daily, @weekly, @monthly
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["reports"],
    default_args={"retries": 1, "retry_delay": timedelta(minutes=2)},
)
def weekly_report_dag():
    """Generate and send the weekly business report."""

    @task
    def fetch_metrics() -> dict:
        # Return value is automatically pushed as an XCom
        return {"revenue": 100_000, "orders": 4_200}

    @task
    def generate_report(metrics: dict) -> str:
        report = f"Revenue: ${metrics['revenue']:,}, Orders: {metrics['orders']:,}"
        return report

    @task
    def send_report(report: str):
        print(f"Sending: {report}")
        # ... email/Slack send logic

    # Wiring: pass return values directly between @task functions
    metrics = fetch_metrics()
    report = generate_report(metrics)
    send_report(report)

# Instantiate the DAG — this line is required
weekly_report_dag()

Schedule Options

ValueMeaning
"0 6 * * *"Cron expression: daily at 06:00
"@daily"Shorthand for 0 0 * * *
"@hourly"Shorthand for 0 * * * *
"@weekly"Sunday midnight
timedelta(hours=6)Every 6 hours relative to start_date
NoneManual trigger only — no automatic scheduling
"@once"Run exactly once
DatasetData-aware scheduling — trigger when a dataset is updated
catchup=False is usually what you want
When catchup=True (the default), Airflow creates DAG Runs for every missed interval between start_date and now. This can flood your workers with hundreds of runs when you first deploy a DAG. Set catchup=False unless you explicitly need historical backfilling.

Operators

An operator defines what a task does. Airflow ships with dozens of built-in operators, and the provider packages (installed separately) add hundreds more for AWS, GCP, dbt, Spark, etc.

Core Built-in Operators

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator   # was DummyOperator pre-2.4

# BashOperator: run a shell command
check_disk = BashOperator(
    task_id="check_disk_space",
    bash_command="df -h / | awk 'NR==2 {print $5}'",
    env={"MY_VAR": "value"},         # inject env vars
    cwd="/tmp",                      # working directory
)

# PythonOperator: call a Python function
def process_file(file_path: str, **context):
    # context gives access to execution_date, ti (task instance), etc.
    logical_date = context["logical_date"]
    print(f"Processing {file_path} for {logical_date}")

process = PythonOperator(
    task_id="process_file",
    python_callable=process_file,
    op_kwargs={"file_path": "/data/input.csv"},  # passed to callable
)

# EmptyOperator: a no-op, useful as a start/end anchor
start = EmptyOperator(task_id="pipeline_start")
end   = EmptyOperator(task_id="pipeline_end")

start >> process >> end

Sensor Operators

Sensors are a special class of operator that wait for a condition to be met before succeeding. They poll on a poke_interval (default 60s) up to a timeout. In reschedule mode the task slot is freed between checks, which is more resource-efficient for long waits.

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

# Wait for a local file to appear
wait_for_file = FileSensor(
    task_id="wait_for_upload",
    filepath="/data/incoming/{{ ds }}/input.csv",
    poke_interval=30,              # check every 30 seconds
    timeout=60 * 60,               # fail after 1 hour
    mode="reschedule",             # release worker slot between checks
)

# Wait for a task in another DAG to succeed
wait_for_upstream = ExternalTaskSensor(
    task_id="wait_for_upstream_dag",
    external_dag_id="upstream_pipeline",
    external_task_id="final_load",
    execution_delta=timedelta(hours=1),  # upstream runs 1h earlier
    mode="reschedule",
    timeout=60 * 60 * 2,
)

# Wait for an S3 object (requires amazon provider)
wait_for_s3 = S3KeySensor(
    task_id="wait_for_s3_file",
    bucket_name="my-data-bucket",
    bucket_key="raw/{{ ds }}/data.parquet",
    aws_conn_id="aws_default",
    mode="reschedule",
    poke_interval=60,
)

Provider Operators

Provider packages extend Airflow with operators for cloud services and third-party tools. Install them separately: pip install apache-airflow-providers-amazon.

# Common provider packages
pip install apache-airflow-providers-amazon          # S3, Redshift, EMR, Glue
pip install apache-airflow-providers-google          # BigQuery, GCS, Dataflow, Vertex AI
pip install apache-airflow-providers-postgres        # PostgresOperator, PostgresHook
pip install apache-airflow-providers-slack           # SlackWebhookOperator
pip install apache-airflow-providers-http            # SimpleHttpOperator
pip install apache-airflow-providers-dbt-cloud       # dbt Cloud jobs
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

# Run SQL directly against a Postgres connection
run_sql = PostgresOperator(
    task_id="create_daily_summary",
    postgres_conn_id="postgres_warehouse",   # defined in Admin > Connections
    sql="""
        INSERT INTO daily_summary (date, total_orders, total_revenue)
        SELECT
            '{{ ds }}'::date,
            COUNT(*),
            SUM(amount)
        FROM orders
        WHERE created_at::date = '{{ ds }}'::date
        ON CONFLICT (date) DO UPDATE
            SET total_orders = EXCLUDED.total_orders,
                total_revenue = EXCLUDED.total_revenue;
    """,
)

# BigQuery job
bq_job = BigQueryInsertJobOperator(
    task_id="run_bq_transformation",
    gcp_conn_id="google_cloud_default",
    configuration={
        "query": {
            "query": "SELECT * FROM `project.dataset.table` WHERE date = '{{ ds }}'",
            "useLegacySql": False,
            "destinationTable": {
                "projectId": "my-project",
                "datasetId": "my_dataset",
                "tableId": "output_{{ ds_nodash }}",
            },
            "writeDisposition": "WRITE_TRUNCATE",
        }
    },
)

Custom Operators

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any

class SlackNotifyOperator(BaseOperator):
    """
    Sends a message to a Slack channel.

    Args:
        channel: Slack channel name (e.g., '#alerts')
        message: Message text. Supports Jinja templating.
        slack_conn_id: Airflow connection with Slack token.
    """

    # List of fields that support Jinja templating
    template_fields = ("message",)

    def __init__(
        self,
        channel: str,
        message: str,
        slack_conn_id: str = "slack_default",
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.channel = channel
        self.message = message
        self.slack_conn_id = slack_conn_id

    def execute(self, context: dict) -> Any:
        from airflow.providers.http.hooks.http import HttpHook

        hook = HttpHook(method="POST", http_conn_id=self.slack_conn_id)
        payload = {"channel": self.channel, "text": self.message}
        self.log.info("Sending Slack message to %s", self.channel)
        hook.run(endpoint="chat.postMessage", data=payload)
        return {"channel": self.channel, "message": self.message}


# Usage
notify = SlackNotifyOperator(
    task_id="notify_pipeline_complete",
    channel="#data-alerts",
    message="Pipeline {{ dag.dag_id }} completed for {{ ds }}",
)

TaskFlow API

Introduced in Airflow 2.0, the TaskFlow API lets you write DAGs as plain Python functions decorated with @task. It handles XCom push/pull automatically and produces cleaner, more testable code compared to the explicit operator style.

Basic @task Usage

from datetime import datetime
from airflow.decorators import dag, task
import requests

@dag(
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["api", "etl"],
)
def api_etl():

    @task(retries=3, retry_delay=timedelta(minutes=1))
    def extract(api_url: str) -> list[dict]:
        """Fetch records from API. Return value auto-pushed to XCom."""
        resp = requests.get(api_url, timeout=30)
        resp.raise_for_status()
        return resp.json()["records"]   # must be JSON-serializable

    @task
    def transform(records: list[dict]) -> list[dict]:
        """Clean and normalize records."""
        return [
            {
                "id": r["id"],
                "name": r["name"].strip().lower(),
                "value": float(r.get("value", 0)),
            }
            for r in records
            if r.get("id")  # filter out records missing ID
        ]

    @task
    def load(records: list[dict]) -> int:
        """Insert records into DB. Returns number of rows inserted."""
        # ... DB insert logic
        print(f"Loaded {len(records)} records")
        return len(records)

    # Data flows as Python function arguments — XCom handled transparently
    raw = extract(api_url="https://api.example.com/data")
    clean = transform(raw)
    count = load(clean)

api_etl()

@task.bash and @task.branch

from airflow.decorators import dag, task

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def mixed_tasks():

    # Run a bash command — return value is stdout (str)
    @task.bash
    def check_file() -> str:
        return "test -f /data/input.csv && echo 'exists' || echo 'missing'"

    # Branch based on logic — return the task_id(s) to follow
    @task.branch
    def route(file_status: str) -> str:
        return "process_file" if file_status.strip() == "exists" else "skip_run"

    @task
    def process_file():
        print("Processing file...")

    @task
    def skip_run():
        print("Nothing to process today.")

    status = check_file()
    branch = route(status)
    branch >> [process_file(), skip_run()]

mixed_tasks()

TaskFlow vs. Traditional Operator — When to Use Each

ScenarioPreferReason
Python-only logic@taskClean, no boilerplate, auto-XCom
Reusable library operatorOperator classEncapsulates logic, shareable
Provider integrations (BQ, S3)Provider OperatorBuilt-in retry, connection handling
Large data payloadsEither (use external storage)XCom has size limits
Mixing @task + operatorsBothThey compose naturally

Dependencies & Branching

Basic Dependency Syntax

from airflow.operators.empty import EmptyOperator
from airflow.utils.helpers import chain, cross_downstream

a = EmptyOperator(task_id="a")
b = EmptyOperator(task_id="b")
c = EmptyOperator(task_id="c")
d = EmptyOperator(task_id="d")
e = EmptyOperator(task_id="e")

# >> (bitshift): downstream dependency
a >> b >> c           # a then b then c

# << (reverse): upstream dependency
c << b                # same as b >> c

# Fan-out (a triggers b and c in parallel)
a >> [b, c]

# Fan-in (b and c must both complete before d)
[b, c] >> d

# chain(): cleaner for linear sequences
chain(a, b, c, d)

# cross_downstream(): every left task must complete before every right task
cross_downstream([a, b], [c, d])
# Equivalent to: a >> c, a >> d, b >> c, b >> d

# Explicit set_upstream / set_downstream
d.set_upstream(c)
e.set_downstream([a, b])

BranchPythonOperator

from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

def choose_path(**context):
    """Return task_id (or list of task_ids) to execute."""
    hour = context["logical_date"].hour
    if hour < 12:
        return "morning_process"
    elif hour < 18:
        return "afternoon_process"
    else:
        return ["evening_process", "send_summary"]  # can return multiple

branch = BranchPythonOperator(
    task_id="route_by_time",
    python_callable=choose_path,
)

morning   = EmptyOperator(task_id="morning_process")
afternoon = EmptyOperator(task_id="afternoon_process")
evening   = EmptyOperator(task_id="evening_process")
summary   = EmptyOperator(task_id="send_summary")

# Tasks NOT chosen by branch are SKIPPED (not failed)
# Use trigger_rule="none_failed_min_one_success" on join tasks
join = EmptyOperator(
    task_id="join",
    trigger_rule="none_failed_min_one_success",
)

branch >> [morning, afternoon, evening, summary] >> join

Trigger Rules

By default a task waits for all upstream tasks to succeed (all_success). Override with trigger_rule=:

RuleTriggers when...
all_successAll upstream tasks succeeded (default)
all_failedAll upstream tasks failed
all_doneAll upstream tasks finished (any state)
one_successAt least one upstream task succeeded
one_failedAt least one upstream task failed
none_failedNo upstream tasks failed (success or skip OK)
none_skippedNo upstream tasks were skipped
none_failed_min_one_successNo failures, at least one success — use after branches
alwaysNo conditions — run regardless of upstream state

XComs

XCom (Cross-Communication) lets task instances share small pieces of data. Values are serialized and stored in the Airflow metadata database — which means they are limited in size (a few MB at most) and not suitable for passing DataFrames, large result sets, or binary blobs.

Explicit Push / Pull

from airflow.operators.python import PythonOperator

def push_data(ti, **context):
    # ti = task instance, injected automatically via **context
    record_count = 42_000
    # Default key is "return_value"; use custom keys for multiple values
    ti.xcom_push(key="record_count", value=record_count)
    ti.xcom_push(key="source_file", value="/data/2024-01-01.csv")

def pull_data(ti, **context):
    count = ti.xcom_pull(task_ids="extract_task", key="record_count")
    path  = ti.xcom_pull(task_ids="extract_task", key="source_file")
    print(f"Loaded {count} records from {path}")
    # Pulling the return value (implicit push via return statement)
    prev_result = ti.xcom_pull(task_ids="some_other_task")  # key defaults to "return_value"

extract_task = PythonOperator(task_id="extract_task", python_callable=push_data)
load_task    = PythonOperator(task_id="load_task",    python_callable=pull_data)

extract_task >> load_task

Auto-XCom with TaskFlow

@task
def extract() -> dict:
    # Returning a value automatically pushes it as XCom key="return_value"
    return {"rows": 1000, "file": "/data/raw.csv"}

@task
def load(data: dict):
    # Passing a @task return value as an argument auto-pulls from XCom
    print(f"Loading {data['rows']} rows from {data['file']}")

result = extract()
load(result)   # Airflow resolves the XCom dependency automatically

XCom Limitations and Best Practices

XComs are not for large data
XCom values are stored in the metadata database. Pushing large objects (DataFrames, file contents, large JSON arrays) will bloat the DB and cause performance issues. The hard limit depends on your DB column type — typically a few MB.

Variables, Connections & Hooks

Variables

Variables are key-value pairs stored in the metadata DB. Use them for environment-specific configuration that should not be hardcoded in DAG files.

from airflow.models import Variable

# Read a variable (fails loudly if missing — good for required config)
bucket = Variable.get("s3_data_bucket")

# Read with a default (optional config)
batch_size = int(Variable.get("etl_batch_size", default_var=1000))

# Read a JSON variable and deserialize automatically
config = Variable.get("pipeline_config", deserialize_json=True)
# config is now a Python dict

# Set a variable (usually done via UI or CLI, but possible in code)
Variable.set("last_run_date", "2024-01-01")
# CLI variable management
airflow variables set s3_data_bucket my-prod-bucket
airflow variables get s3_data_bucket
airflow variables list
airflow variables export variables.json
airflow variables import variables.json
Variable.get() at module scope hits the DB on every parse
Calling Variable.get() at the top of a DAG file (outside a function) runs a DB query every time the scheduler parses the file — multiple times per minute. Always call Variable.get() inside task functions or use Jinja template syntax {{ var.value.my_key }} instead.

Connections

Connections store credentials and endpoint info for external systems. They are referenced by a conn_id string in operators and hooks.

# Add a connection via CLI
airflow connections add postgres_warehouse \
  --conn-type postgres \
  --conn-host db.example.com \
  --conn-port 5432 \
  --conn-login etl_user \
  --conn-password "s3cr3t" \
  --conn-schema warehouse

# Add via environment variable (preferred for production)
# Format: AIRFLOW_CONN_{CONN_ID_UPPERCASE}=conn-type://user:pass@host:port/schema
export AIRFLOW_CONN_POSTGRES_WAREHOUSE="postgresql://etl_user:[email protected]:5432/warehouse"
export AIRFLOW_CONN_AWS_DEFAULT="aws://AKID:SECRET_KEY@/?region_name=us-east-1"

Hooks

Hooks are the low-level connection classes used by operators internally. You can also use them directly in PythonOperator or @task functions.

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook

# PostgresHook: execute SQL, get a connection, or load a DataFrame
def query_db(**context):
    hook = PostgresHook(postgres_conn_id="postgres_warehouse")

    # Execute and fetch
    records = hook.get_records("SELECT id, name FROM users WHERE active = true")

    # Get a raw DBAPI connection
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute("INSERT INTO audit_log (event) VALUES (%s)", ("pipeline_run",))
    conn.commit()

    # Load to/from pandas (requires pandas)
    df = hook.get_pandas_df("SELECT * FROM daily_metrics WHERE date = %s", parameters=["2024-01-01"])
    return len(df)

# S3Hook
def upload_to_s3(**context):
    hook = S3Hook(aws_conn_id="aws_default")
    hook.load_file(
        filename="/tmp/output.csv",
        key=f"processed/{context['ds']}/output.csv",
        bucket_name="my-data-bucket",
        replace=True,
    )
    # Check if file exists
    exists = hook.check_for_key("raw/input.csv", bucket_name="my-data-bucket")

# HttpHook: call REST APIs
def call_api(**context):
    hook = HttpHook(method="GET", http_conn_id="my_api")
    response = hook.run(
        endpoint=f"/data?date={context['ds']}",
        headers={"Accept": "application/json"},
    )
    return response.json()

Secrets Backend

For production, store secrets in a proper secrets manager rather than the metadata DB:

# airflow.cfg / environment variable configuration for AWS Secrets Manager
export AIRFLOW__SECRETS__BACKEND="airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend"
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'

# Store a connection in AWS Secrets Manager
aws secretsmanager create-secret \
  --name "airflow/connections/postgres_warehouse" \
  --secret-string "postgresql://etl_user:[email protected]:5432/warehouse"

Scheduling & Execution

execution_date vs. logical_date vs. data_interval

Airflow's scheduling model can be confusing at first. The key insight is that a DAG Run scheduled for a given time interval processes data from that interval, and typically runs after the interval ends.

TermMeaning
logical_dateThe start of the data interval this run covers. Previously called execution_date.
data_interval_startSame as logical_date — beginning of the data window
data_interval_endEnd of the data window (= logical_date + schedule interval)
run_idUnique identifier for the run, e.g. scheduled__2024-01-01T06:00:00+00:00
dsJinja shorthand: logical_date.strftime('%Y-%m-%d')
ds_nodashSame without dashes: 20240101
tsFull ISO 8601 timestamp of logical_date
The "N-1" scheduling model
A DAG with schedule="@daily" and start_date=2024-01-01 will create its first run with logical_date=2024-01-01 at 2024-01-02 00:00 (after the interval ends). This is intentional — the run processes data from 2024-01-01, which is only complete once 2024-01-02 begins.

Backfilling

# Backfill DAG runs for a specific date range
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  daily_etl_pipeline

# Run with 4 parallel DAG runs
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --max-active-runs 4 \
  daily_etl_pipeline

# Dry run: show what would be run without executing
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --dry-run \
  daily_etl_pipeline

Time Zones

from datetime import datetime
import pendulum

# Always use timezone-aware datetimes for start_date
# pendulum is bundled with Airflow and is the recommended library

@dag(
    schedule="0 9 * * 1-5",            # 9 AM Mon-Fri
    start_date=pendulum.datetime(2024, 1, 1, tz="America/New_York"),
    catchup=False,
)
def business_hours_dag():
    pass
Airflow stores all datetimes in UTC internally
Even if you specify a timezone-aware start_date, Airflow converts everything to UTC in the metadata DB. The UI displays in UTC by default. Configure default_ui_timezone in airflow.cfg to display in your local timezone.

Testing & Debugging

CLI Testing Commands

# Test an entire DAG Run (does not write to metadata DB)
airflow dags test daily_etl_pipeline 2024-01-15

# Test a single task in isolation (writes to metadata DB)
airflow tasks test daily_etl_pipeline extract_sales 2024-01-15

# List all DAGs
airflow dags list

# List tasks in a DAG
airflow tasks list daily_etl_pipeline --tree

# Check for DAG import errors
airflow dags list-import-errors

# Print the DAG structure (for debugging dependencies)
airflow dags show daily_etl_pipeline  # generates a DOT file

Unit Testing DAGs with pytest

# tests/test_daily_etl_dag.py
import pytest
from airflow.models import DagBag

@pytest.fixture(scope="module")
def dag_bag():
    """Load DAGs from the dags/ folder."""
    return DagBag(dag_folder="dags/", include_examples=False)

class TestDailyEtlDag:

    def test_dag_loaded(self, dag_bag):
        """DAG should be importable with no errors."""
        assert dag_bag.import_errors == {}, f"DAG import errors: {dag_bag.import_errors}"
        assert "daily_etl_pipeline" in dag_bag.dags

    def test_task_count(self, dag_bag):
        dag = dag_bag.get_dag("daily_etl_pipeline")
        assert len(dag.tasks) == 3

    def test_task_ids(self, dag_bag):
        dag = dag_bag.get_dag("daily_etl_pipeline")
        task_ids = {t.task_id for t in dag.tasks}
        assert task_ids == {"extract_sales", "transform_sales", "load_to_warehouse"}

    def test_dependency_order(self, dag_bag):
        dag = dag_bag.get_dag("daily_etl_pipeline")
        extract  = dag.get_task("extract_sales")
        transform = dag.get_task("transform_sales")
        load     = dag.get_task("load_to_warehouse")

        assert transform in extract.downstream_list
        assert load in transform.downstream_list

    def test_schedule(self, dag_bag):
        dag = dag_bag.get_dag("daily_etl_pipeline")
        assert str(dag.schedule_interval) == "0 6 * * *"

    def test_catchup_disabled(self, dag_bag):
        dag = dag_bag.get_dag("daily_etl_pipeline")
        assert dag.catchup is False


# Unit test a callable used in a PythonOperator
from dags.daily_etl_pipeline import transform_data

class TestTransformData:

    def test_strips_whitespace(self):
        input_records = [{"id": 1, "name": "  Alice  ", "value": "42.5"}]
        result = transform_data(input_records)
        assert result[0]["name"] == "alice"
        assert result[0]["value"] == 42.5

    def test_filters_missing_id(self):
        input_records = [{"id": None, "name": "Bob"}, {"id": 2, "name": "Carol"}]
        result = transform_data(input_records)
        assert len(result) == 1
        assert result[0]["id"] == 2

Common Errors and Fixes

ErrorCauseFix
DAG not appearing in UI Import error, wrong folder, paused Run airflow dags list-import-errors; check scheduler logs
ModuleNotFoundError in scheduler Package not installed in scheduler's Python env Install package in scheduler container; rebuild Docker image
Tasks stuck in queued No available worker slots, executor misconfigured Check parallelism config; check Celery worker logs
XCom data missing Task ran in a different DAG Run; wrong key Verify task_ids and key in xcom_pull()
Sensor timeout External condition never met within timeout window Increase timeout; check external system; use mode="reschedule"
Jinja template not rendered Value passed to non-templated field Check template_fields on operator; use op_kwargs for Python callables

Executors

The executor determines how and where task instances are run. Configure it in airflow.cfg under [core] executor or via AIRFLOW__CORE__EXECUTOR.

Executor Types

ExecutorBest ForNotes
SequentialExecutor Development / testing only Runs one task at a time; default with SQLite
LocalExecutor Single-machine production, low-medium workloads Spawns subprocesses; requires PostgreSQL or MySQL
CeleryExecutor Distributed, scalable, multi-worker Needs Redis or RabbitMQ broker + Celery workers
KubernetesExecutor Cloud-native, task-level isolation Each task runs in its own pod; zero idle workers
CeleryKubernetesExecutor Hybrid: fast Celery tasks + isolated K8s tasks Route per-task via queue="kubernetes"

Concurrency Configuration

# airflow.cfg relevant settings (or use AIRFLOW__SECTION__KEY env vars)

# AIRFLOW__CORE__PARALLELISM
# Max total task instances running across all DAGs at once (default: 32)
AIRFLOW__CORE__PARALLELISM=32

# AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG
# Max running tasks per DAG at one time (default: 16)
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=16

# AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG
# Max concurrent DAG runs per DAG (default: 16)
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=16

# Per-DAG overrides in DAG definition:
# max_active_tasks=8       # override MAX_ACTIVE_TASKS_PER_DAG for this DAG
# max_active_runs=2        # override MAX_ACTIVE_RUNS_PER_DAG for this DAG

# CeleryExecutor: number of parallel tasks per worker process
AIRFLOW__CELERY__WORKER_CONCURRENCY=16

KubernetesExecutor Basics

# airflow.cfg / environment
# AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
# AIRFLOW__KUBERNETES__NAMESPACE=airflow
# AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY=my-registry/airflow
# AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG=2.10.4

# Per-task pod overrides using executor_config
from kubernetes.client import models as k8s

heavy_task = PythonOperator(
    task_id="heavy_computation",
    python_callable=run_heavy_job,
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        resources=k8s.V1ResourceRequirements(
                            requests={"cpu": "2", "memory": "4Gi"},
                            limits={"cpu": "4", "memory": "8Gi"},
                        ),
                    )
                ]
            )
        )
    },
)

Production Patterns

Dynamic DAG Generation

Generate multiple similar DAGs from a config list. Each DAG gets its own dag_id and is registered independently.

# dags/dynamic_etl_dags.py
from datetime import datetime
from airflow.decorators import dag, task

# Config-driven DAG factory: one DAG per data source
DATA_SOURCES = [
    {"name": "salesforce", "table": "opportunities", "schedule": "0 7 * * *"},
    {"name": "hubspot",    "table": "contacts",      "schedule": "0 8 * * *"},
    {"name": "stripe",     "table": "charges",       "schedule": "0 */4 * * *"},
]

def make_etl_dag(source_name: str, table: str, schedule: str):
    """Factory function returns a configured DAG object."""

    @dag(
        dag_id=f"etl_{source_name}_{table}",
        schedule=schedule,
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=["etl", source_name],
    )
    def etl_dag():

        @task
        def extract() -> list[dict]:
            print(f"Extracting {table} from {source_name}")
            # ... real extraction logic
            return []

        @task
        def load(records: list[dict]):
            print(f"Loading {len(records)} records into {table}")

        load(extract())

    return etl_dag()

# Register all DAGs in module scope — Airflow discovers them automatically
for source in DATA_SOURCES:
    make_etl_dag(**source)

Idempotent Tasks

Tasks should be safe to re-run. If a task fails partway through and is retried, it should produce the same result without duplication or corruption.

@task
def load_to_postgres(records: list[dict], **context):
    """
    Idempotent load: delete the partition first, then insert.
    Re-running this task for the same logical_date produces identical DB state.
    """
    from airflow.providers.postgres.hooks.postgres import PostgresHook

    date = context["ds"]
    hook = PostgresHook(postgres_conn_id="postgres_warehouse")

    with hook.get_conn() as conn:
        with conn.cursor() as cur:
            # DELETE then INSERT (upsert pattern)
            cur.execute("DELETE FROM daily_sales WHERE date = %s", (date,))
            cur.executemany(
                "INSERT INTO daily_sales (date, product_id, revenue) VALUES (%s, %s, %s)",
                [(date, r["product_id"], r["revenue"]) for r in records],
            )
        conn.commit()

    return len(records)

Retry and Timeout Configuration

from datetime import timedelta
from airflow.operators.python import PythonOperator

# Fine-grained retry config at task level
flaky_api_call = PythonOperator(
    task_id="call_external_api",
    python_callable=call_api,
    retries=5,
    retry_delay=timedelta(minutes=2),
    retry_exponential_backoff=True,   # 2, 4, 8, 16, 32 minutes
    max_retry_delay=timedelta(minutes=30),
    execution_timeout=timedelta(minutes=15),  # kill task if it runs too long
)

# SLA: raise an alert if task hasn't completed by this offset from DAG start
from datetime import timedelta
slow_job = PythonOperator(
    task_id="slow_transformation",
    python_callable=run_transformation,
    sla=timedelta(hours=2),          # alert if not done 2h after DAG start
)

Alerting via Callbacks

from airflow.providers.slack.notifications.slack import SlackNotifier

def on_failure_callback(context):
    """Called when any task in the DAG fails."""
    dag_id  = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    log_url = context["task_instance"].log_url
    date    = context["ds"]

    message = (
        f":red_circle: *Task Failed*\n"
        f"DAG: `{dag_id}` | Task: `{task_id}` | Date: `{date}`\n"
        f"<{log_url}|View Logs>"
    )
    # Use the SlackWebhookOperator or a direct HTTP call here
    print(message)  # replace with actual Slack webhook call

def on_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(f"SLA missed for tasks: {[str(t) for t in task_list]}")

with DAG(
    dag_id="production_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    on_failure_callback=on_failure_callback,   # DAG-level failure callback
    sla_miss_callback=on_sla_miss_callback,
    default_args={
        "on_failure_callback": on_failure_callback,  # task-level
    },
) as dag:
    pass

Pools for Resource Management

Pools limit how many tasks can use a shared resource concurrently — useful for rate-limiting API calls or protecting a downstream database.

# Create a pool via CLI
airflow pools set external_api_pool 5 "Limit concurrent external API calls to 5"

# Or via Python (usually done in a setup script)
airflow pools import pools.json
# Assign a task to a pool
api_task = PythonOperator(
    task_id="call_rate_limited_api",
    python_callable=call_api,
    pool="external_api_pool",         # only 5 tasks from this pool run at once
    pool_slots=1,                     # this task takes 1 slot (default)
)

# A "heavy" task might take multiple slots
heavy_job = PythonOperator(
    task_id="heavy_spark_job",
    python_callable=run_spark,
    pool="spark_pool",
    pool_slots=3,                     # counts as 3 slots
)

Common Production Use Cases

ETL Pipeline: API → Transform → Warehouse

from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests
import json

@dag(
    dag_id="orders_etl",
    schedule="0 2 * * *",                # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
    tags=["etl", "orders"],
)
def orders_etl():
    """
    Daily orders ETL:
    1. Extract orders from Orders API for the previous day
    2. Transform: normalize fields, compute derived metrics
    3. Load to PostgreSQL warehouse (idempotent UPSERT)
    """

    @task(pool="external_api_pool")
    def extract_orders(**context) -> str:
        """
        Fetch orders for the data interval. Returns S3 path, not the data
        itself, to avoid hitting XCom size limits.
        """
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        date = context["ds"]
        resp = requests.get(
            f"https://api.orders.example.com/orders",
            params={"date": date, "limit": 10_000},
            headers={"Authorization": f"Bearer {Variable.get('orders_api_token')}"},
            timeout=60,
        )
        resp.raise_for_status()
        orders = resp.json()["data"]

        # Write to S3 staging; pass only the path via XCom
        s3_path = f"staging/orders/{date}/raw.json"
        S3Hook(aws_conn_id="aws_default").load_string(
            string_data=json.dumps(orders),
            key=s3_path,
            bucket_name="etl-staging",
            replace=True,
        )
        return s3_path

    @task
    def transform_orders(s3_path: str, **context) -> str:
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        hook = S3Hook(aws_conn_id="aws_default")
        raw = json.loads(hook.read_key(s3_path, bucket_name="etl-staging"))

        transformed = []
        for order in raw:
            transformed.append({
                "order_id":    order["id"],
                "customer_id": order["customer"]["id"],
                "total_usd":   round(sum(item["price"] * item["qty"] for item in order["items"]), 2),
                "status":      order["status"].lower(),
                "created_at":  order["created_at"],
                "date":        context["ds"],
            })

        out_path = s3_path.replace("raw.json", "transformed.json")
        hook.load_string(
            string_data=json.dumps(transformed),
            key=out_path,
            bucket_name="etl-staging",
            replace=True,
        )
        return out_path

    @task
    def load_to_warehouse(s3_path: str, **context) -> int:
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        records = json.loads(
            S3Hook(aws_conn_id="aws_default").read_key(s3_path, bucket_name="etl-staging")
        )
        date = context["ds"]
        hook = PostgresHook(postgres_conn_id="postgres_warehouse")

        with hook.get_conn() as conn:
            with conn.cursor() as cur:
                # Idempotent: delete-insert for the partition
                cur.execute("DELETE FROM orders WHERE date = %s", (date,))
                cur.executemany(
                    """
                    INSERT INTO orders (order_id, customer_id, total_usd, status, created_at, date)
                    VALUES (%(order_id)s, %(customer_id)s, %(total_usd)s, %(status)s, %(created_at)s, %(date)s)
                    """,
                    records,
                )
            conn.commit()

        return len(records)

    raw_path = extract_orders()
    clean_path = transform_orders(raw_path)
    load_to_warehouse(clean_path)

orders_etl()

Data Quality Checks with Great Expectations

from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.exceptions import AirflowException

@dag(
    dag_id="orders_with_dq_checks",
    schedule="0 3 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
)
def orders_with_dq():

    load_complete = EmptyOperator(task_id="load_complete")

    @task
    def run_dq_checks(**context) -> dict:
        """
        Run data quality checks after load.
        Raises AirflowException to fail the task if checks fail.
        """
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        date = context["ds"]
        hook = PostgresHook(postgres_conn_id="postgres_warehouse")

        checks = {}

        # Check 1: Row count should be > 0
        row_count = hook.get_first(
            "SELECT COUNT(*) FROM orders WHERE date = %s", parameters=(date,)
        )[0]
        checks["row_count"] = row_count
        if row_count == 0:
            raise AirflowException(f"DQ FAIL: Zero rows loaded for date {date}")

        # Check 2: No null order_ids
        null_ids = hook.get_first(
            "SELECT COUNT(*) FROM orders WHERE date = %s AND order_id IS NULL",
            parameters=(date,),
        )[0]
        checks["null_order_ids"] = null_ids
        if null_ids > 0:
            raise AirflowException(f"DQ FAIL: {null_ids} null order_ids for date {date}")

        # Check 3: Revenue should be positive
        negative_revenue = hook.get_first(
            "SELECT COUNT(*) FROM orders WHERE date = %s AND total_usd < 0",
            parameters=(date,),
        )[0]
        checks["negative_revenue"] = negative_revenue
        if negative_revenue > 0:
            raise AirflowException(f"DQ FAIL: {negative_revenue} orders with negative revenue")

        print(f"DQ checks passed: {checks}")
        return checks

    @task
    def publish_to_reporting():
        print("All checks passed — publishing to reporting layer")

    load_complete >> run_dq_checks() >> publish_to_reporting()

orders_with_dq()

ML Pipeline: Train → Evaluate → Conditional Deploy

from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator

@dag(
    dag_id="model_training_pipeline",
    schedule="@weekly",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["ml", "training"],
)
def model_training():

    @task
    def prepare_training_data(**context) -> str:
        """Extract and prepare training data. Returns path to dataset."""
        date = context["ds"]
        print(f"Preparing training data up to {date}")
        # ... data preparation logic
        return f"s3://ml-bucket/training-data/{date}/dataset.parquet"

    @task
    def train_model(data_path: str) -> dict:
        """Train the model. Returns metrics."""
        print(f"Training on {data_path}")
        # ... training logic (could submit a SageMaker/Vertex job)
        return {
            "accuracy": 0.923,
            "f1_score": 0.911,
            "model_path": "s3://ml-bucket/models/latest/model.pkl",
            "baseline_accuracy": 0.900,
        }

    def decide_deploy(ti, **context) -> str:
        """Branch: deploy if model beats baseline, else reject."""
        metrics = ti.xcom_pull(task_ids="train_model")
        if metrics["accuracy"] > metrics["baseline_accuracy"]:
            return "deploy_model"
        return "reject_model"

    branch = BranchPythonOperator(
        task_id="evaluate_and_decide",
        python_callable=decide_deploy,
    )

    @task(trigger_rule="none_failed_min_one_success")
    def deploy_model(ti=None, **context):
        metrics = ti.xcom_pull(task_ids="train_model")
        print(f"Deploying model with accuracy={metrics['accuracy']:.3f}")
        # ... deployment logic (update serving endpoint, etc.)

    @task
    def reject_model(ti=None, **context):
        metrics = ti.xcom_pull(task_ids="train_model")
        print(f"Rejecting model: accuracy={metrics['accuracy']:.3f} did not beat baseline")

    notify_done = EmptyOperator(
        task_id="notify_done",
        trigger_rule="none_failed_min_one_success",
    )

    data_path = prepare_training_data()
    metrics   = train_model(data_path)
    metrics >> branch
    branch >> [deploy_model(), reject_model()] >> notify_done

model_training()

Report Generation: Query → Template → Send

from airflow.decorators import dag, task
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

@dag(
    dag_id="weekly_business_report",
    schedule="0 8 * * 1",           # Monday 8 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["reports", "email"],
)
def weekly_business_report():

    @task
    def gather_metrics(**context) -> dict:
        """Query key business metrics for the past week."""
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id="postgres_warehouse")

        # Use data_interval_start/end for the correct week window
        start = context["data_interval_start"].strftime("%Y-%m-%d")
        end   = context["data_interval_end"].strftime("%Y-%m-%d")

        revenue = hook.get_first(
            "SELECT COALESCE(SUM(total_usd), 0) FROM orders WHERE date >= %s AND date < %s",
            parameters=(start, end),
        )[0]

        order_count = hook.get_first(
            "SELECT COUNT(*) FROM orders WHERE date >= %s AND date < %s",
            parameters=(start, end),
        )[0]

        new_customers = hook.get_first(
            "SELECT COUNT(DISTINCT customer_id) FROM orders WHERE date >= %s AND date < %s",
            parameters=(start, end),
        )[0]

        return {
            "week_start": start,
            "week_end": end,
            "revenue": float(revenue),
            "order_count": int(order_count),
            "new_customers": int(new_customers),
            "avg_order_value": round(float(revenue) / max(int(order_count), 1), 2),
        }

    @task
    def render_report(metrics: dict) -> str:
        """Render HTML report from metrics dict."""
        html = f"""
        <html><body style="font-family: sans-serif; max-width: 600px; margin: 0 auto;">
          <h2>Weekly Business Report — {metrics['week_start']} to {metrics['week_end']}</h2>
          <table border="1" cellpadding="8" style="border-collapse: collapse; width: 100%;">
            <tr><th>Metric</th><th>Value</th></tr>
            <tr><td>Total Revenue</td><td>${metrics['revenue']:,.2f}</td></tr>
            <tr><td>Orders</td><td>{metrics['order_count']:,}</td></tr>
            <tr><td>New Customers</td><td>{metrics['new_customers']:,}</td></tr>
            <tr><td>Avg Order Value</td><td>${metrics['avg_order_value']:,.2f}</td></tr>
          </table>
        </body></html>
        """
        return html

    @task
    def send_email_report(html: str, **context):
        """Send report via SMTP using Airflow's email connection."""
        from airflow.utils.email import send_email

        send_email(
            to=["[email protected]", "[email protected]"],
            subject=f"Weekly Business Report — {context['ds']}",
            html_content=html,
        )
        print("Report sent successfully")

    metrics = gather_metrics()
    html    = render_report(metrics)
    send_email_report(html)

weekly_business_report()
Jinja templating in operators vs. @task functions
In BashOperator and PostgresOperator, template strings like {{ ds }} are rendered automatically because those fields are listed in template_fields. In @task functions, access the same values through **context (e.g., context["ds"]) or by declaring **context in your function signature.