Table of Contents
Setup & Environment
The fastest path to a working Airflow environment is the official Docker Compose setup. It gives you a production-like stack (PostgreSQL metadata DB, Redis broker, Celery workers, scheduler, webserver) without any local Python dependency wrestling.
Docker Compose (Recommended)
# 1. Download the official docker-compose.yaml for Airflow 2.10.4
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.4/docker-compose.yaml'
# 2. Create required directories
mkdir -p ./dags ./logs ./plugins ./config
# 3. Set your host user ID so containers write files you can edit
echo -e "AIRFLOW_UID=$(id -u)" > .env
# 4. Initialize the metadata database and create the admin user
docker compose up airflow-init
# 5. Start all services in the background
docker compose up -d
# 6. Watch scheduler and webserver come up
docker compose logs -f webserver scheduler
Access the UI at http://localhost:8080 — username airflow, password airflow.
Local pip Install (for Development/Testing)
# Python 3.9–3.12 supported
python -m venv airflow-venv
source airflow-venv/bin/activate
# Airflow uses constraint files to avoid dependency conflicts
AIRFLOW_VERSION=2.10.4
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Initialize SQLite metadata DB and create admin user
airflow db migrate
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected] \
--password admin
# Start webserver and scheduler in separate terminals
airflow webserver --port 8080
airflow scheduler
LocalExecutor with PostgreSQL for anything beyond single-task DAGs.
The Docker setup is strongly preferred for realistic testing.
Verification Checklist
- UI loads at
http://localhost:8080 - Example DAGs (prefixed
example_) are visible in the DAG list - Trigger
example_bash_operatormanually — it should complete green - Check Admin → Connections to see pre-configured connections
Core Concepts
Airflow is a workflow orchestrator, not a data processor. It schedules and monitors pipelines defined as Python code — it does not move or transform data itself. Think of it as the traffic controller that decides when and where work runs, and tracks what happened.
Architecture
| Component | Role |
|---|---|
| DAG | Directed Acyclic Graph — a Python file defining tasks and their dependencies |
| Task | A unit of work, defined by an Operator instance |
| Operator | Template for a task type (BashOperator, PythonOperator, etc.) |
| DAG Run | A single execution instance of a DAG for a specific logical date |
| Task Instance | A single execution of a Task within a DAG Run |
| Scheduler | Parses DAG files, decides when DAG Runs should be created, submits tasks to the executor |
| Executor | Mechanism for running tasks (Local, Celery, Kubernetes) |
| Metadata DB | PostgreSQL/MySQL database storing all DAG, run, and task state |
| Worker | Process or pod that actually executes a task instance |
| Webserver | Flask app serving the UI and REST API |
The "Define in Python, Execute Anywhere" Philosophy
DAG files are pure Python code — no YAML configs, no XML. This means you get loops, conditionals, imports, and tests for free. The scheduler imports your DAG files repeatedly (every ~30 seconds by default) to detect changes, so DAG-level code must be fast and side-effect free. Heavy computation belongs inside task functions, not at module scope.
Task Instance Lifecycle
Task instances transition through these states:
scheduled → queued → running →
success / failed / skipped / up_for_retry
- scheduled — dependencies met, waiting for executor slot
- queued — submitted to executor, waiting for a worker
- running — worker is actively executing
- up_for_retry — failed but retries remain
- skipped — a branching operator chose a different path
- deferred — async sensor waiting for an external trigger
DAGs
A DAG file is any Python file in the dags/ folder that contains a DAG object.
There are two styles: the context manager (classic) and the
@dag decorator (TaskFlow API, Airflow 2.0+).
Context Manager Style (Classic)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# default_args apply to all tasks unless overridden at task level
default_args = {
"owner": "data-team",
"depends_on_past": False, # don't wait for previous run's same task
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5min, 10min, 20min...
"execution_timeout": timedelta(hours=1),
}
with DAG(
dag_id="daily_etl_pipeline",
description="Extract, transform, load daily sales data",
schedule="0 6 * * *", # cron: daily at 06:00 UTC
start_date=datetime(2024, 1, 1),
catchup=False, # don't backfill missed runs
default_args=default_args,
tags=["etl", "sales", "daily"],
max_active_runs=1, # only one concurrent DAG run
doc_md="""
## Daily ETL Pipeline
Extracts sales data from the API, transforms it, and loads to the warehouse.
Runs daily at 06:00 UTC.
""",
) as dag:
extract = BashOperator(
task_id="extract_sales",
bash_command="python /opt/scripts/extract.py --date {{ ds }}",
)
def transform_data(**context):
execution_date = context["ds"]
print(f"Transforming data for {execution_date}")
# ... transformation logic
transform = PythonOperator(
task_id="transform_sales",
python_callable=transform_data,
)
load = BashOperator(
task_id="load_to_warehouse",
bash_command="python /opt/scripts/load.py --date {{ ds }}",
)
extract >> transform >> load
Decorator Style (@dag)
from datetime import datetime, timedelta
from airflow.decorators import dag, task
@dag(
dag_id="weekly_report",
schedule="@weekly", # shorthand: @hourly, @daily, @weekly, @monthly
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["reports"],
default_args={"retries": 1, "retry_delay": timedelta(minutes=2)},
)
def weekly_report_dag():
"""Generate and send the weekly business report."""
@task
def fetch_metrics() -> dict:
# Return value is automatically pushed as an XCom
return {"revenue": 100_000, "orders": 4_200}
@task
def generate_report(metrics: dict) -> str:
report = f"Revenue: ${metrics['revenue']:,}, Orders: {metrics['orders']:,}"
return report
@task
def send_report(report: str):
print(f"Sending: {report}")
# ... email/Slack send logic
# Wiring: pass return values directly between @task functions
metrics = fetch_metrics()
report = generate_report(metrics)
send_report(report)
# Instantiate the DAG — this line is required
weekly_report_dag()
Schedule Options
| Value | Meaning |
|---|---|
"0 6 * * *" | Cron expression: daily at 06:00 |
"@daily" | Shorthand for 0 0 * * * |
"@hourly" | Shorthand for 0 * * * * |
"@weekly" | Sunday midnight |
timedelta(hours=6) | Every 6 hours relative to start_date |
None | Manual trigger only — no automatic scheduling |
"@once" | Run exactly once |
| Dataset | Data-aware scheduling — trigger when a dataset is updated |
catchup=True (the default), Airflow creates DAG Runs for every missed
interval between start_date and now. This can flood your workers with hundreds
of runs when you first deploy a DAG. Set catchup=False unless you explicitly
need historical backfilling.
Operators
An operator defines what a task does. Airflow ships with dozens of built-in operators, and the provider packages (installed separately) add hundreds more for AWS, GCP, dbt, Spark, etc.
Core Built-in Operators
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator # was DummyOperator pre-2.4
# BashOperator: run a shell command
check_disk = BashOperator(
task_id="check_disk_space",
bash_command="df -h / | awk 'NR==2 {print $5}'",
env={"MY_VAR": "value"}, # inject env vars
cwd="/tmp", # working directory
)
# PythonOperator: call a Python function
def process_file(file_path: str, **context):
# context gives access to execution_date, ti (task instance), etc.
logical_date = context["logical_date"]
print(f"Processing {file_path} for {logical_date}")
process = PythonOperator(
task_id="process_file",
python_callable=process_file,
op_kwargs={"file_path": "/data/input.csv"}, # passed to callable
)
# EmptyOperator: a no-op, useful as a start/end anchor
start = EmptyOperator(task_id="pipeline_start")
end = EmptyOperator(task_id="pipeline_end")
start >> process >> end
Sensor Operators
Sensors are a special class of operator that wait for a condition to be met
before succeeding. They poll on a poke_interval (default 60s) up to a
timeout. In reschedule mode the task slot is freed between checks,
which is more resource-efficient for long waits.
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Wait for a local file to appear
wait_for_file = FileSensor(
task_id="wait_for_upload",
filepath="/data/incoming/{{ ds }}/input.csv",
poke_interval=30, # check every 30 seconds
timeout=60 * 60, # fail after 1 hour
mode="reschedule", # release worker slot between checks
)
# Wait for a task in another DAG to succeed
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_upstream_dag",
external_dag_id="upstream_pipeline",
external_task_id="final_load",
execution_delta=timedelta(hours=1), # upstream runs 1h earlier
mode="reschedule",
timeout=60 * 60 * 2,
)
# Wait for an S3 object (requires amazon provider)
wait_for_s3 = S3KeySensor(
task_id="wait_for_s3_file",
bucket_name="my-data-bucket",
bucket_key="raw/{{ ds }}/data.parquet",
aws_conn_id="aws_default",
mode="reschedule",
poke_interval=60,
)
Provider Operators
Provider packages extend Airflow with operators for cloud services and third-party tools.
Install them separately: pip install apache-airflow-providers-amazon.
# Common provider packages
pip install apache-airflow-providers-amazon # S3, Redshift, EMR, Glue
pip install apache-airflow-providers-google # BigQuery, GCS, Dataflow, Vertex AI
pip install apache-airflow-providers-postgres # PostgresOperator, PostgresHook
pip install apache-airflow-providers-slack # SlackWebhookOperator
pip install apache-airflow-providers-http # SimpleHttpOperator
pip install apache-airflow-providers-dbt-cloud # dbt Cloud jobs
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
# Run SQL directly against a Postgres connection
run_sql = PostgresOperator(
task_id="create_daily_summary",
postgres_conn_id="postgres_warehouse", # defined in Admin > Connections
sql="""
INSERT INTO daily_summary (date, total_orders, total_revenue)
SELECT
'{{ ds }}'::date,
COUNT(*),
SUM(amount)
FROM orders
WHERE created_at::date = '{{ ds }}'::date
ON CONFLICT (date) DO UPDATE
SET total_orders = EXCLUDED.total_orders,
total_revenue = EXCLUDED.total_revenue;
""",
)
# BigQuery job
bq_job = BigQueryInsertJobOperator(
task_id="run_bq_transformation",
gcp_conn_id="google_cloud_default",
configuration={
"query": {
"query": "SELECT * FROM `project.dataset.table` WHERE date = '{{ ds }}'",
"useLegacySql": False,
"destinationTable": {
"projectId": "my-project",
"datasetId": "my_dataset",
"tableId": "output_{{ ds_nodash }}",
},
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
Custom Operators
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from typing import Any
class SlackNotifyOperator(BaseOperator):
"""
Sends a message to a Slack channel.
Args:
channel: Slack channel name (e.g., '#alerts')
message: Message text. Supports Jinja templating.
slack_conn_id: Airflow connection with Slack token.
"""
# List of fields that support Jinja templating
template_fields = ("message",)
def __init__(
self,
channel: str,
message: str,
slack_conn_id: str = "slack_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.channel = channel
self.message = message
self.slack_conn_id = slack_conn_id
def execute(self, context: dict) -> Any:
from airflow.providers.http.hooks.http import HttpHook
hook = HttpHook(method="POST", http_conn_id=self.slack_conn_id)
payload = {"channel": self.channel, "text": self.message}
self.log.info("Sending Slack message to %s", self.channel)
hook.run(endpoint="chat.postMessage", data=payload)
return {"channel": self.channel, "message": self.message}
# Usage
notify = SlackNotifyOperator(
task_id="notify_pipeline_complete",
channel="#data-alerts",
message="Pipeline {{ dag.dag_id }} completed for {{ ds }}",
)
TaskFlow API
Introduced in Airflow 2.0, the TaskFlow API lets you write DAGs as plain Python functions
decorated with @task. It handles XCom push/pull automatically and produces
cleaner, more testable code compared to the explicit operator style.
Basic @task Usage
from datetime import datetime
from airflow.decorators import dag, task
import requests
@dag(
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["api", "etl"],
)
def api_etl():
@task(retries=3, retry_delay=timedelta(minutes=1))
def extract(api_url: str) -> list[dict]:
"""Fetch records from API. Return value auto-pushed to XCom."""
resp = requests.get(api_url, timeout=30)
resp.raise_for_status()
return resp.json()["records"] # must be JSON-serializable
@task
def transform(records: list[dict]) -> list[dict]:
"""Clean and normalize records."""
return [
{
"id": r["id"],
"name": r["name"].strip().lower(),
"value": float(r.get("value", 0)),
}
for r in records
if r.get("id") # filter out records missing ID
]
@task
def load(records: list[dict]) -> int:
"""Insert records into DB. Returns number of rows inserted."""
# ... DB insert logic
print(f"Loaded {len(records)} records")
return len(records)
# Data flows as Python function arguments — XCom handled transparently
raw = extract(api_url="https://api.example.com/data")
clean = transform(raw)
count = load(clean)
api_etl()
@task.bash and @task.branch
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def mixed_tasks():
# Run a bash command — return value is stdout (str)
@task.bash
def check_file() -> str:
return "test -f /data/input.csv && echo 'exists' || echo 'missing'"
# Branch based on logic — return the task_id(s) to follow
@task.branch
def route(file_status: str) -> str:
return "process_file" if file_status.strip() == "exists" else "skip_run"
@task
def process_file():
print("Processing file...")
@task
def skip_run():
print("Nothing to process today.")
status = check_file()
branch = route(status)
branch >> [process_file(), skip_run()]
mixed_tasks()
TaskFlow vs. Traditional Operator — When to Use Each
| Scenario | Prefer | Reason |
|---|---|---|
| Python-only logic | @task | Clean, no boilerplate, auto-XCom |
| Reusable library operator | Operator class | Encapsulates logic, shareable |
| Provider integrations (BQ, S3) | Provider Operator | Built-in retry, connection handling |
| Large data payloads | Either (use external storage) | XCom has size limits |
| Mixing @task + operators | Both | They compose naturally |
Dependencies & Branching
Basic Dependency Syntax
from airflow.operators.empty import EmptyOperator
from airflow.utils.helpers import chain, cross_downstream
a = EmptyOperator(task_id="a")
b = EmptyOperator(task_id="b")
c = EmptyOperator(task_id="c")
d = EmptyOperator(task_id="d")
e = EmptyOperator(task_id="e")
# >> (bitshift): downstream dependency
a >> b >> c # a then b then c
# << (reverse): upstream dependency
c << b # same as b >> c
# Fan-out (a triggers b and c in parallel)
a >> [b, c]
# Fan-in (b and c must both complete before d)
[b, c] >> d
# chain(): cleaner for linear sequences
chain(a, b, c, d)
# cross_downstream(): every left task must complete before every right task
cross_downstream([a, b], [c, d])
# Equivalent to: a >> c, a >> d, b >> c, b >> d
# Explicit set_upstream / set_downstream
d.set_upstream(c)
e.set_downstream([a, b])
BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def choose_path(**context):
"""Return task_id (or list of task_ids) to execute."""
hour = context["logical_date"].hour
if hour < 12:
return "morning_process"
elif hour < 18:
return "afternoon_process"
else:
return ["evening_process", "send_summary"] # can return multiple
branch = BranchPythonOperator(
task_id="route_by_time",
python_callable=choose_path,
)
morning = EmptyOperator(task_id="morning_process")
afternoon = EmptyOperator(task_id="afternoon_process")
evening = EmptyOperator(task_id="evening_process")
summary = EmptyOperator(task_id="send_summary")
# Tasks NOT chosen by branch are SKIPPED (not failed)
# Use trigger_rule="none_failed_min_one_success" on join tasks
join = EmptyOperator(
task_id="join",
trigger_rule="none_failed_min_one_success",
)
branch >> [morning, afternoon, evening, summary] >> join
Trigger Rules
By default a task waits for all upstream tasks to succeed
(all_success). Override with trigger_rule=:
| Rule | Triggers when... |
|---|---|
all_success | All upstream tasks succeeded (default) |
all_failed | All upstream tasks failed |
all_done | All upstream tasks finished (any state) |
one_success | At least one upstream task succeeded |
one_failed | At least one upstream task failed |
none_failed | No upstream tasks failed (success or skip OK) |
none_skipped | No upstream tasks were skipped |
none_failed_min_one_success | No failures, at least one success — use after branches |
always | No conditions — run regardless of upstream state |
XComs
XCom (Cross-Communication) lets task instances share small pieces of data. Values are serialized and stored in the Airflow metadata database — which means they are limited in size (a few MB at most) and not suitable for passing DataFrames, large result sets, or binary blobs.
Explicit Push / Pull
from airflow.operators.python import PythonOperator
def push_data(ti, **context):
# ti = task instance, injected automatically via **context
record_count = 42_000
# Default key is "return_value"; use custom keys for multiple values
ti.xcom_push(key="record_count", value=record_count)
ti.xcom_push(key="source_file", value="/data/2024-01-01.csv")
def pull_data(ti, **context):
count = ti.xcom_pull(task_ids="extract_task", key="record_count")
path = ti.xcom_pull(task_ids="extract_task", key="source_file")
print(f"Loaded {count} records from {path}")
# Pulling the return value (implicit push via return statement)
prev_result = ti.xcom_pull(task_ids="some_other_task") # key defaults to "return_value"
extract_task = PythonOperator(task_id="extract_task", python_callable=push_data)
load_task = PythonOperator(task_id="load_task", python_callable=pull_data)
extract_task >> load_task
Auto-XCom with TaskFlow
@task
def extract() -> dict:
# Returning a value automatically pushes it as XCom key="return_value"
return {"rows": 1000, "file": "/data/raw.csv"}
@task
def load(data: dict):
# Passing a @task return value as an argument auto-pulls from XCom
print(f"Loading {data['rows']} rows from {data['file']}")
result = extract()
load(result) # Airflow resolves the XCom dependency automatically
XCom Limitations and Best Practices
- Small values only — counts, file paths, status strings, config dicts
- For large data — write to S3/GCS/blob storage in one task, pass only the path via XCom to the next
- Custom XCom backend — Airflow supports pluggable XCom backends (e.g., S3XComBackend) that serialize to object storage transparently
- Avoid XCom for control flow — use
BranchPythonOperatorinstead of branching based on XCom values in downstream tasks
Variables, Connections & Hooks
Variables
Variables are key-value pairs stored in the metadata DB. Use them for environment-specific configuration that should not be hardcoded in DAG files.
from airflow.models import Variable
# Read a variable (fails loudly if missing — good for required config)
bucket = Variable.get("s3_data_bucket")
# Read with a default (optional config)
batch_size = int(Variable.get("etl_batch_size", default_var=1000))
# Read a JSON variable and deserialize automatically
config = Variable.get("pipeline_config", deserialize_json=True)
# config is now a Python dict
# Set a variable (usually done via UI or CLI, but possible in code)
Variable.set("last_run_date", "2024-01-01")
# CLI variable management
airflow variables set s3_data_bucket my-prod-bucket
airflow variables get s3_data_bucket
airflow variables list
airflow variables export variables.json
airflow variables import variables.json
Variable.get() at the top of a DAG file (outside a function) runs a DB
query every time the scheduler parses the file — multiple times per minute. Always call
Variable.get() inside task functions or use Jinja template syntax
{{ var.value.my_key }} instead.
Connections
Connections store credentials and endpoint info for external systems. They are referenced
by a conn_id string in operators and hooks.
# Add a connection via CLI
airflow connections add postgres_warehouse \
--conn-type postgres \
--conn-host db.example.com \
--conn-port 5432 \
--conn-login etl_user \
--conn-password "s3cr3t" \
--conn-schema warehouse
# Add via environment variable (preferred for production)
# Format: AIRFLOW_CONN_{CONN_ID_UPPERCASE}=conn-type://user:pass@host:port/schema
export AIRFLOW_CONN_POSTGRES_WAREHOUSE="postgresql://etl_user:[email protected]:5432/warehouse"
export AIRFLOW_CONN_AWS_DEFAULT="aws://AKID:SECRET_KEY@/?region_name=us-east-1"
Hooks
Hooks are the low-level connection classes used by operators internally.
You can also use them directly in PythonOperator or @task functions.
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.http.hooks.http import HttpHook
# PostgresHook: execute SQL, get a connection, or load a DataFrame
def query_db(**context):
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
# Execute and fetch
records = hook.get_records("SELECT id, name FROM users WHERE active = true")
# Get a raw DBAPI connection
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("INSERT INTO audit_log (event) VALUES (%s)", ("pipeline_run",))
conn.commit()
# Load to/from pandas (requires pandas)
df = hook.get_pandas_df("SELECT * FROM daily_metrics WHERE date = %s", parameters=["2024-01-01"])
return len(df)
# S3Hook
def upload_to_s3(**context):
hook = S3Hook(aws_conn_id="aws_default")
hook.load_file(
filename="/tmp/output.csv",
key=f"processed/{context['ds']}/output.csv",
bucket_name="my-data-bucket",
replace=True,
)
# Check if file exists
exists = hook.check_for_key("raw/input.csv", bucket_name="my-data-bucket")
# HttpHook: call REST APIs
def call_api(**context):
hook = HttpHook(method="GET", http_conn_id="my_api")
response = hook.run(
endpoint=f"/data?date={context['ds']}",
headers={"Accept": "application/json"},
)
return response.json()
Secrets Backend
For production, store secrets in a proper secrets manager rather than the metadata DB:
# airflow.cfg / environment variable configuration for AWS Secrets Manager
export AIRFLOW__SECRETS__BACKEND="airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend"
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}'
# Store a connection in AWS Secrets Manager
aws secretsmanager create-secret \
--name "airflow/connections/postgres_warehouse" \
--secret-string "postgresql://etl_user:[email protected]:5432/warehouse"
Scheduling & Execution
execution_date vs. logical_date vs. data_interval
Airflow's scheduling model can be confusing at first. The key insight is that a DAG Run scheduled for a given time interval processes data from that interval, and typically runs after the interval ends.
| Term | Meaning |
|---|---|
logical_date | The start of the data interval this run covers. Previously called execution_date. |
data_interval_start | Same as logical_date — beginning of the data window |
data_interval_end | End of the data window (= logical_date + schedule interval) |
run_id | Unique identifier for the run, e.g. scheduled__2024-01-01T06:00:00+00:00 |
ds | Jinja shorthand: logical_date.strftime('%Y-%m-%d') |
ds_nodash | Same without dashes: 20240101 |
ts | Full ISO 8601 timestamp of logical_date |
schedule="@daily" and start_date=2024-01-01 will
create its first run with logical_date=2024-01-01 at 2024-01-02 00:00
(after the interval ends). This is intentional — the run processes data from
2024-01-01, which is only complete once 2024-01-02 begins.
Backfilling
# Backfill DAG runs for a specific date range
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
daily_etl_pipeline
# Run with 4 parallel DAG runs
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--max-active-runs 4 \
daily_etl_pipeline
# Dry run: show what would be run without executing
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--dry-run \
daily_etl_pipeline
Time Zones
from datetime import datetime
import pendulum
# Always use timezone-aware datetimes for start_date
# pendulum is bundled with Airflow and is the recommended library
@dag(
schedule="0 9 * * 1-5", # 9 AM Mon-Fri
start_date=pendulum.datetime(2024, 1, 1, tz="America/New_York"),
catchup=False,
)
def business_hours_dag():
pass
start_date, Airflow converts everything to
UTC in the metadata DB. The UI displays in UTC by default. Configure
default_ui_timezone in airflow.cfg to display in your local timezone.
Testing & Debugging
CLI Testing Commands
# Test an entire DAG Run (does not write to metadata DB)
airflow dags test daily_etl_pipeline 2024-01-15
# Test a single task in isolation (writes to metadata DB)
airflow tasks test daily_etl_pipeline extract_sales 2024-01-15
# List all DAGs
airflow dags list
# List tasks in a DAG
airflow tasks list daily_etl_pipeline --tree
# Check for DAG import errors
airflow dags list-import-errors
# Print the DAG structure (for debugging dependencies)
airflow dags show daily_etl_pipeline # generates a DOT file
Unit Testing DAGs with pytest
# tests/test_daily_etl_dag.py
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="module")
def dag_bag():
"""Load DAGs from the dags/ folder."""
return DagBag(dag_folder="dags/", include_examples=False)
class TestDailyEtlDag:
def test_dag_loaded(self, dag_bag):
"""DAG should be importable with no errors."""
assert dag_bag.import_errors == {}, f"DAG import errors: {dag_bag.import_errors}"
assert "daily_etl_pipeline" in dag_bag.dags
def test_task_count(self, dag_bag):
dag = dag_bag.get_dag("daily_etl_pipeline")
assert len(dag.tasks) == 3
def test_task_ids(self, dag_bag):
dag = dag_bag.get_dag("daily_etl_pipeline")
task_ids = {t.task_id for t in dag.tasks}
assert task_ids == {"extract_sales", "transform_sales", "load_to_warehouse"}
def test_dependency_order(self, dag_bag):
dag = dag_bag.get_dag("daily_etl_pipeline")
extract = dag.get_task("extract_sales")
transform = dag.get_task("transform_sales")
load = dag.get_task("load_to_warehouse")
assert transform in extract.downstream_list
assert load in transform.downstream_list
def test_schedule(self, dag_bag):
dag = dag_bag.get_dag("daily_etl_pipeline")
assert str(dag.schedule_interval) == "0 6 * * *"
def test_catchup_disabled(self, dag_bag):
dag = dag_bag.get_dag("daily_etl_pipeline")
assert dag.catchup is False
# Unit test a callable used in a PythonOperator
from dags.daily_etl_pipeline import transform_data
class TestTransformData:
def test_strips_whitespace(self):
input_records = [{"id": 1, "name": " Alice ", "value": "42.5"}]
result = transform_data(input_records)
assert result[0]["name"] == "alice"
assert result[0]["value"] == 42.5
def test_filters_missing_id(self):
input_records = [{"id": None, "name": "Bob"}, {"id": 2, "name": "Carol"}]
result = transform_data(input_records)
assert len(result) == 1
assert result[0]["id"] == 2
Common Errors and Fixes
| Error | Cause | Fix |
|---|---|---|
| DAG not appearing in UI | Import error, wrong folder, paused | Run airflow dags list-import-errors; check scheduler logs |
ModuleNotFoundError in scheduler |
Package not installed in scheduler's Python env | Install package in scheduler container; rebuild Docker image |
Tasks stuck in queued |
No available worker slots, executor misconfigured | Check parallelism config; check Celery worker logs |
| XCom data missing | Task ran in a different DAG Run; wrong key | Verify task_ids and key in xcom_pull() |
| Sensor timeout | External condition never met within timeout window | Increase timeout; check external system; use mode="reschedule" |
| Jinja template not rendered | Value passed to non-templated field | Check template_fields on operator; use op_kwargs for Python callables |
Executors
The executor determines how and where task instances are run.
Configure it in airflow.cfg under [core] executor or via
AIRFLOW__CORE__EXECUTOR.
Executor Types
| Executor | Best For | Notes |
|---|---|---|
| SequentialExecutor | Development / testing only | Runs one task at a time; default with SQLite |
| LocalExecutor | Single-machine production, low-medium workloads | Spawns subprocesses; requires PostgreSQL or MySQL |
| CeleryExecutor | Distributed, scalable, multi-worker | Needs Redis or RabbitMQ broker + Celery workers |
| KubernetesExecutor | Cloud-native, task-level isolation | Each task runs in its own pod; zero idle workers |
| CeleryKubernetesExecutor | Hybrid: fast Celery tasks + isolated K8s tasks | Route per-task via queue="kubernetes" |
Concurrency Configuration
# airflow.cfg relevant settings (or use AIRFLOW__SECTION__KEY env vars)
# AIRFLOW__CORE__PARALLELISM
# Max total task instances running across all DAGs at once (default: 32)
AIRFLOW__CORE__PARALLELISM=32
# AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG
# Max running tasks per DAG at one time (default: 16)
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=16
# AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG
# Max concurrent DAG runs per DAG (default: 16)
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=16
# Per-DAG overrides in DAG definition:
# max_active_tasks=8 # override MAX_ACTIVE_TASKS_PER_DAG for this DAG
# max_active_runs=2 # override MAX_ACTIVE_RUNS_PER_DAG for this DAG
# CeleryExecutor: number of parallel tasks per worker process
AIRFLOW__CELERY__WORKER_CONCURRENCY=16
KubernetesExecutor Basics
# airflow.cfg / environment
# AIRFLOW__CORE__EXECUTOR=KubernetesExecutor
# AIRFLOW__KUBERNETES__NAMESPACE=airflow
# AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY=my-registry/airflow
# AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG=2.10.4
# Per-task pod overrides using executor_config
from kubernetes.client import models as k8s
heavy_task = PythonOperator(
task_id="heavy_computation",
python_callable=run_heavy_job,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={"cpu": "2", "memory": "4Gi"},
limits={"cpu": "4", "memory": "8Gi"},
),
)
]
)
)
},
)
Production Patterns
Dynamic DAG Generation
Generate multiple similar DAGs from a config list. Each DAG gets its own
dag_id and is registered independently.
# dags/dynamic_etl_dags.py
from datetime import datetime
from airflow.decorators import dag, task
# Config-driven DAG factory: one DAG per data source
DATA_SOURCES = [
{"name": "salesforce", "table": "opportunities", "schedule": "0 7 * * *"},
{"name": "hubspot", "table": "contacts", "schedule": "0 8 * * *"},
{"name": "stripe", "table": "charges", "schedule": "0 */4 * * *"},
]
def make_etl_dag(source_name: str, table: str, schedule: str):
"""Factory function returns a configured DAG object."""
@dag(
dag_id=f"etl_{source_name}_{table}",
schedule=schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", source_name],
)
def etl_dag():
@task
def extract() -> list[dict]:
print(f"Extracting {table} from {source_name}")
# ... real extraction logic
return []
@task
def load(records: list[dict]):
print(f"Loading {len(records)} records into {table}")
load(extract())
return etl_dag()
# Register all DAGs in module scope — Airflow discovers them automatically
for source in DATA_SOURCES:
make_etl_dag(**source)
Idempotent Tasks
Tasks should be safe to re-run. If a task fails partway through and is retried, it should produce the same result without duplication or corruption.
@task
def load_to_postgres(records: list[dict], **context):
"""
Idempotent load: delete the partition first, then insert.
Re-running this task for the same logical_date produces identical DB state.
"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
date = context["ds"]
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
with hook.get_conn() as conn:
with conn.cursor() as cur:
# DELETE then INSERT (upsert pattern)
cur.execute("DELETE FROM daily_sales WHERE date = %s", (date,))
cur.executemany(
"INSERT INTO daily_sales (date, product_id, revenue) VALUES (%s, %s, %s)",
[(date, r["product_id"], r["revenue"]) for r in records],
)
conn.commit()
return len(records)
Retry and Timeout Configuration
from datetime import timedelta
from airflow.operators.python import PythonOperator
# Fine-grained retry config at task level
flaky_api_call = PythonOperator(
task_id="call_external_api",
python_callable=call_api,
retries=5,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True, # 2, 4, 8, 16, 32 minutes
max_retry_delay=timedelta(minutes=30),
execution_timeout=timedelta(minutes=15), # kill task if it runs too long
)
# SLA: raise an alert if task hasn't completed by this offset from DAG start
from datetime import timedelta
slow_job = PythonOperator(
task_id="slow_transformation",
python_callable=run_transformation,
sla=timedelta(hours=2), # alert if not done 2h after DAG start
)
Alerting via Callbacks
from airflow.providers.slack.notifications.slack import SlackNotifier
def on_failure_callback(context):
"""Called when any task in the DAG fails."""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
log_url = context["task_instance"].log_url
date = context["ds"]
message = (
f":red_circle: *Task Failed*\n"
f"DAG: `{dag_id}` | Task: `{task_id}` | Date: `{date}`\n"
f"<{log_url}|View Logs>"
)
# Use the SlackWebhookOperator or a direct HTTP call here
print(message) # replace with actual Slack webhook call
def on_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(f"SLA missed for tasks: {[str(t) for t in task_list]}")
with DAG(
dag_id="production_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
on_failure_callback=on_failure_callback, # DAG-level failure callback
sla_miss_callback=on_sla_miss_callback,
default_args={
"on_failure_callback": on_failure_callback, # task-level
},
) as dag:
pass
Pools for Resource Management
Pools limit how many tasks can use a shared resource concurrently — useful for rate-limiting API calls or protecting a downstream database.
# Create a pool via CLI
airflow pools set external_api_pool 5 "Limit concurrent external API calls to 5"
# Or via Python (usually done in a setup script)
airflow pools import pools.json
# Assign a task to a pool
api_task = PythonOperator(
task_id="call_rate_limited_api",
python_callable=call_api,
pool="external_api_pool", # only 5 tasks from this pool run at once
pool_slots=1, # this task takes 1 slot (default)
)
# A "heavy" task might take multiple slots
heavy_job = PythonOperator(
task_id="heavy_spark_job",
python_callable=run_spark,
pool="spark_pool",
pool_slots=3, # counts as 3 slots
)
Common Production Use Cases
ETL Pipeline: API → Transform → Warehouse
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import requests
import json
@dag(
dag_id="orders_etl",
schedule="0 2 * * *", # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": 3, "retry_delay": timedelta(minutes=5)},
tags=["etl", "orders"],
)
def orders_etl():
"""
Daily orders ETL:
1. Extract orders from Orders API for the previous day
2. Transform: normalize fields, compute derived metrics
3. Load to PostgreSQL warehouse (idempotent UPSERT)
"""
@task(pool="external_api_pool")
def extract_orders(**context) -> str:
"""
Fetch orders for the data interval. Returns S3 path, not the data
itself, to avoid hitting XCom size limits.
"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
date = context["ds"]
resp = requests.get(
f"https://api.orders.example.com/orders",
params={"date": date, "limit": 10_000},
headers={"Authorization": f"Bearer {Variable.get('orders_api_token')}"},
timeout=60,
)
resp.raise_for_status()
orders = resp.json()["data"]
# Write to S3 staging; pass only the path via XCom
s3_path = f"staging/orders/{date}/raw.json"
S3Hook(aws_conn_id="aws_default").load_string(
string_data=json.dumps(orders),
key=s3_path,
bucket_name="etl-staging",
replace=True,
)
return s3_path
@task
def transform_orders(s3_path: str, **context) -> str:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="aws_default")
raw = json.loads(hook.read_key(s3_path, bucket_name="etl-staging"))
transformed = []
for order in raw:
transformed.append({
"order_id": order["id"],
"customer_id": order["customer"]["id"],
"total_usd": round(sum(item["price"] * item["qty"] for item in order["items"]), 2),
"status": order["status"].lower(),
"created_at": order["created_at"],
"date": context["ds"],
})
out_path = s3_path.replace("raw.json", "transformed.json")
hook.load_string(
string_data=json.dumps(transformed),
key=out_path,
bucket_name="etl-staging",
replace=True,
)
return out_path
@task
def load_to_warehouse(s3_path: str, **context) -> int:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
records = json.loads(
S3Hook(aws_conn_id="aws_default").read_key(s3_path, bucket_name="etl-staging")
)
date = context["ds"]
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
with hook.get_conn() as conn:
with conn.cursor() as cur:
# Idempotent: delete-insert for the partition
cur.execute("DELETE FROM orders WHERE date = %s", (date,))
cur.executemany(
"""
INSERT INTO orders (order_id, customer_id, total_usd, status, created_at, date)
VALUES (%(order_id)s, %(customer_id)s, %(total_usd)s, %(status)s, %(created_at)s, %(date)s)
""",
records,
)
conn.commit()
return len(records)
raw_path = extract_orders()
clean_path = transform_orders(raw_path)
load_to_warehouse(clean_path)
orders_etl()
Data Quality Checks with Great Expectations
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.exceptions import AirflowException
@dag(
dag_id="orders_with_dq_checks",
schedule="0 3 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def orders_with_dq():
load_complete = EmptyOperator(task_id="load_complete")
@task
def run_dq_checks(**context) -> dict:
"""
Run data quality checks after load.
Raises AirflowException to fail the task if checks fail.
"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
date = context["ds"]
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
checks = {}
# Check 1: Row count should be > 0
row_count = hook.get_first(
"SELECT COUNT(*) FROM orders WHERE date = %s", parameters=(date,)
)[0]
checks["row_count"] = row_count
if row_count == 0:
raise AirflowException(f"DQ FAIL: Zero rows loaded for date {date}")
# Check 2: No null order_ids
null_ids = hook.get_first(
"SELECT COUNT(*) FROM orders WHERE date = %s AND order_id IS NULL",
parameters=(date,),
)[0]
checks["null_order_ids"] = null_ids
if null_ids > 0:
raise AirflowException(f"DQ FAIL: {null_ids} null order_ids for date {date}")
# Check 3: Revenue should be positive
negative_revenue = hook.get_first(
"SELECT COUNT(*) FROM orders WHERE date = %s AND total_usd < 0",
parameters=(date,),
)[0]
checks["negative_revenue"] = negative_revenue
if negative_revenue > 0:
raise AirflowException(f"DQ FAIL: {negative_revenue} orders with negative revenue")
print(f"DQ checks passed: {checks}")
return checks
@task
def publish_to_reporting():
print("All checks passed — publishing to reporting layer")
load_complete >> run_dq_checks() >> publish_to_reporting()
orders_with_dq()
ML Pipeline: Train → Evaluate → Conditional Deploy
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
@dag(
dag_id="model_training_pipeline",
schedule="@weekly",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ml", "training"],
)
def model_training():
@task
def prepare_training_data(**context) -> str:
"""Extract and prepare training data. Returns path to dataset."""
date = context["ds"]
print(f"Preparing training data up to {date}")
# ... data preparation logic
return f"s3://ml-bucket/training-data/{date}/dataset.parquet"
@task
def train_model(data_path: str) -> dict:
"""Train the model. Returns metrics."""
print(f"Training on {data_path}")
# ... training logic (could submit a SageMaker/Vertex job)
return {
"accuracy": 0.923,
"f1_score": 0.911,
"model_path": "s3://ml-bucket/models/latest/model.pkl",
"baseline_accuracy": 0.900,
}
def decide_deploy(ti, **context) -> str:
"""Branch: deploy if model beats baseline, else reject."""
metrics = ti.xcom_pull(task_ids="train_model")
if metrics["accuracy"] > metrics["baseline_accuracy"]:
return "deploy_model"
return "reject_model"
branch = BranchPythonOperator(
task_id="evaluate_and_decide",
python_callable=decide_deploy,
)
@task(trigger_rule="none_failed_min_one_success")
def deploy_model(ti=None, **context):
metrics = ti.xcom_pull(task_ids="train_model")
print(f"Deploying model with accuracy={metrics['accuracy']:.3f}")
# ... deployment logic (update serving endpoint, etc.)
@task
def reject_model(ti=None, **context):
metrics = ti.xcom_pull(task_ids="train_model")
print(f"Rejecting model: accuracy={metrics['accuracy']:.3f} did not beat baseline")
notify_done = EmptyOperator(
task_id="notify_done",
trigger_rule="none_failed_min_one_success",
)
data_path = prepare_training_data()
metrics = train_model(data_path)
metrics >> branch
branch >> [deploy_model(), reject_model()] >> notify_done
model_training()
Report Generation: Query → Template → Send
from airflow.decorators import dag, task
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
@dag(
dag_id="weekly_business_report",
schedule="0 8 * * 1", # Monday 8 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["reports", "email"],
)
def weekly_business_report():
@task
def gather_metrics(**context) -> dict:
"""Query key business metrics for the past week."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_warehouse")
# Use data_interval_start/end for the correct week window
start = context["data_interval_start"].strftime("%Y-%m-%d")
end = context["data_interval_end"].strftime("%Y-%m-%d")
revenue = hook.get_first(
"SELECT COALESCE(SUM(total_usd), 0) FROM orders WHERE date >= %s AND date < %s",
parameters=(start, end),
)[0]
order_count = hook.get_first(
"SELECT COUNT(*) FROM orders WHERE date >= %s AND date < %s",
parameters=(start, end),
)[0]
new_customers = hook.get_first(
"SELECT COUNT(DISTINCT customer_id) FROM orders WHERE date >= %s AND date < %s",
parameters=(start, end),
)[0]
return {
"week_start": start,
"week_end": end,
"revenue": float(revenue),
"order_count": int(order_count),
"new_customers": int(new_customers),
"avg_order_value": round(float(revenue) / max(int(order_count), 1), 2),
}
@task
def render_report(metrics: dict) -> str:
"""Render HTML report from metrics dict."""
html = f"""
<html><body style="font-family: sans-serif; max-width: 600px; margin: 0 auto;">
<h2>Weekly Business Report — {metrics['week_start']} to {metrics['week_end']}</h2>
<table border="1" cellpadding="8" style="border-collapse: collapse; width: 100%;">
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Total Revenue</td><td>${metrics['revenue']:,.2f}</td></tr>
<tr><td>Orders</td><td>{metrics['order_count']:,}</td></tr>
<tr><td>New Customers</td><td>{metrics['new_customers']:,}</td></tr>
<tr><td>Avg Order Value</td><td>${metrics['avg_order_value']:,.2f}</td></tr>
</table>
</body></html>
"""
return html
@task
def send_email_report(html: str, **context):
"""Send report via SMTP using Airflow's email connection."""
from airflow.utils.email import send_email
send_email(
to=["[email protected]", "[email protected]"],
subject=f"Weekly Business Report — {context['ds']}",
html_content=html,
)
print("Report sent successfully")
metrics = gather_metrics()
html = render_report(metrics)
send_email_report(html)
weekly_business_report()
BashOperator and PostgresOperator, template strings like
{{ ds }} are rendered automatically because those fields are listed in
template_fields. In @task functions, access the same values
through **context (e.g., context["ds"]) or by declaring
**context in your function signature.