搬瓦工部署 Apache Airflow 工作流调度平台教程
Apache Airflow 是由 Airbnb 开发的开源工作流调度平台,广泛用于数据工程领域。它允许你使用 Python 代码定义工作流(DAG),实现任务的自动化编排、调度和监控。无论是 ETL 数据管道、定时任务还是机器学习流水线,Airflow 都能胜任。本文将介绍如何在搬瓦工 VPS 上通过 Docker 部署 Airflow。部署前请确保已安装 Docker 和 Docker Compose。
一、Airflow 核心概念
- DAG(有向无环图):定义任务之间的依赖关系和执行顺序。
- Operator:执行具体操作的单元,如 BashOperator、PythonOperator、HttpOperator 等。
- Scheduler:调度器负责解析 DAG 并在合适的时间触发任务。
- Worker:实际执行任务的进程。
- Web Server:提供 Web UI 用于监控和管理 DAG。
- XCom:任务间的数据传递机制。
二、系统要求
- 操作系统:Ubuntu 20.04+ 或 Debian 11+。
- 内存:至少 4GB(Airflow 组件较多,内存需求较高)。
- 磁盘:至少 15GB 可用空间。
- Docker:已安装 Docker 和 Docker Compose。
建议选择搬瓦工 THE PLAN 或更高方案。购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣,购买链接:bwh81.net。
三、创建项目目录
mkdir -p /opt/airflow/{dags,logs,plugins,config}
cd /opt/airflow
四、创建环境变量文件
cat > /opt/airflow/.env <<'EOF'
AIRFLOW_UID=50000
AIRFLOW_GID=0
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=your_admin_password
AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow_db_pass@airflow-db:5432/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow_db_pass@airflow-db:5432/airflow
AIRFLOW__CELERY__BROKER_URL=redis://airflow-cache:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=false
EOF
生成 Fernet Key:
python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
五、编写 Docker Compose 配置
cat > /opt/airflow/docker-compose.yml <<'EOF'
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8-python3.11
env_file: .env
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
depends_on:
- airflow-db
- airflow-cache
restart: always
services:
airflow-db:
image: postgres:15
restart: always
environment:
POSTGRES_DB: airflow
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow_db_pass
volumes:
- airflow_db_data:/var/lib/postgresql/data
airflow-cache:
image: redis:7-alpine
restart: always
airflow-init:
<<: *airflow-common
command: >
bash -c "airflow db migrate && airflow users create
--username admin --password your_admin_password
--firstname Admin --lastname User
--role Admin --email admin@example.com"
restart: "no"
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
airflow-scheduler:
<<: *airflow-common
command: scheduler
airflow-worker:
<<: *airflow-common
command: celery worker
volumes:
airflow_db_data:
EOF
六、启动服务
cd /opt/airflow
# 先执行初始化
docker compose up airflow-init
# 启动所有服务
docker compose up -d
检查服务状态:
docker compose ps
七、编写第一个 DAG
在 dags 目录下创建示例 DAG 文件:
cat > /opt/airflow/dags/example_dag.py <<'EOF'
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'admin',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'example_daily_task',
default_args=default_args,
description='A simple example DAG',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
task1 = BashOperator(
task_id='print_date',
bash_command='date',
)
task2 = BashOperator(
task_id='check_disk',
bash_command='df -h',
)
task1 >> task2
EOF
DAG 文件保存后,Airflow Scheduler 会在几秒内自动检测并加载。
八、配置反向代理
cat > /etc/nginx/sites-available/airflow <<'EOF'
server {
listen 80;
server_name your_domain.com;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
EOF
ln -s /etc/nginx/sites-available/airflow /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
certbot --nginx -d your_domain.com
九、资源优化
在 VPS 资源有限的情况下,可以考虑使用 LocalExecutor 替代 CeleryExecutor,这样不需要 Redis 和独立的 Worker 进程:
AIRFLOW__CORE__EXECUTOR=LocalExecutor
同时可以减少 Scheduler 的解析频率以降低 CPU 使用:
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=60
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30
十、常见问题
DAG 未出现在 Web UI 中
检查 DAG 文件是否有 Python 语法错误:
docker compose exec airflow-webserver python /opt/airflow/dags/example_dag.py
任务执行失败
在 Web UI 中点击失败的任务实例,查看日志获取详细错误信息。常见原因包括依赖包缺失或网络连接问题。
总结
Apache Airflow 是数据工程领域的标准工作流调度工具,功能强大但资源消耗较高。如果你的场景较为简单,可以考虑更轻量的 Prefect 或 Dagster。选购搬瓦工 VPS 请参考 全部方案,购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣。