搬瓦工部署 Apache Airflow 工作流调度平台教程

Apache Airflow 是由 Airbnb 开发的开源工作流调度平台,广泛用于数据工程领域。它允许你使用 Python 代码定义工作流(DAG),实现任务的自动化编排、调度和监控。无论是 ETL 数据管道、定时任务还是机器学习流水线,Airflow 都能胜任。本文将介绍如何在搬瓦工 VPS 上通过 Docker 部署 Airflow。部署前请确保已安装 Docker 和 Docker Compose

一、Airflow 核心概念

  • DAG(有向无环图):定义任务之间的依赖关系和执行顺序。
  • Operator:执行具体操作的单元,如 BashOperator、PythonOperator、HttpOperator 等。
  • Scheduler:调度器负责解析 DAG 并在合适的时间触发任务。
  • Worker:实际执行任务的进程。
  • Web Server:提供 Web UI 用于监控和管理 DAG。
  • XCom:任务间的数据传递机制。

二、系统要求

  • 操作系统:Ubuntu 20.04+ 或 Debian 11+。
  • 内存:至少 4GB(Airflow 组件较多,内存需求较高)。
  • 磁盘:至少 15GB 可用空间。
  • Docker:已安装 Docker 和 Docker Compose。

建议选择搬瓦工 THE PLAN 或更高方案。购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣,购买链接:bwh81.net

三、创建项目目录

mkdir -p /opt/airflow/{dags,logs,plugins,config}
cd /opt/airflow

四、创建环境变量文件

cat > /opt/airflow/.env <<'EOF'
AIRFLOW_UID=50000
AIRFLOW_GID=0
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=your_admin_password
AIRFLOW__CORE__FERNET_KEY=your_fernet_key_here
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow_db_pass@airflow-db:5432/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow_db_pass@airflow-db:5432/airflow
AIRFLOW__CELERY__BROKER_URL=redis://airflow-cache:6379/0
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=false
EOF

生成 Fernet Key:

python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

五、编写 Docker Compose 配置

cat > /opt/airflow/docker-compose.yml <<'EOF'
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8-python3.11
  env_file: .env
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  depends_on:
    - airflow-db
    - airflow-cache
  restart: always

services:
  airflow-db:
    image: postgres:15
    restart: always
    environment:
      POSTGRES_DB: airflow
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow_db_pass
    volumes:
      - airflow_db_data:/var/lib/postgresql/data

  airflow-cache:
    image: redis:7-alpine
    restart: always

  airflow-init:
    <<: *airflow-common
    command: >
      bash -c "airflow db migrate && airflow users create
      --username admin --password your_admin_password
      --firstname Admin --lastname User
      --role Admin --email admin@example.com"
    restart: "no"

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler

  airflow-worker:
    <<: *airflow-common
    command: celery worker

volumes:
  airflow_db_data:
EOF

六、启动服务

cd /opt/airflow

# 先执行初始化
docker compose up airflow-init

# 启动所有服务
docker compose up -d

检查服务状态:

docker compose ps

七、编写第一个 DAG

dags 目录下创建示例 DAG 文件:

cat > /opt/airflow/dags/example_dag.py <<'EOF'
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'admin',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'example_daily_task',
    default_args=default_args,
    description='A simple example DAG',
    schedule_interval='0 2 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    task1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    task2 = BashOperator(
        task_id='check_disk',
        bash_command='df -h',
    )

    task1 >> task2
EOF

DAG 文件保存后,Airflow Scheduler 会在几秒内自动检测并加载。

八、配置反向代理

cat > /etc/nginx/sites-available/airflow <<'EOF'
server {
    listen 80;
    server_name your_domain.com;

    location / {
        proxy_pass http://127.0.0.1:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}
EOF

ln -s /etc/nginx/sites-available/airflow /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
certbot --nginx -d your_domain.com

九、资源优化

在 VPS 资源有限的情况下,可以考虑使用 LocalExecutor 替代 CeleryExecutor,这样不需要 Redis 和独立的 Worker 进程:

AIRFLOW__CORE__EXECUTOR=LocalExecutor

同时可以减少 Scheduler 的解析频率以降低 CPU 使用:

AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=60
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=30

十、常见问题

DAG 未出现在 Web UI 中

检查 DAG 文件是否有 Python 语法错误:

docker compose exec airflow-webserver python /opt/airflow/dags/example_dag.py

任务执行失败

在 Web UI 中点击失败的任务实例,查看日志获取详细错误信息。常见原因包括依赖包缺失或网络连接问题。

总结

Apache Airflow 是数据工程领域的标准工作流调度工具,功能强大但资源消耗较高。如果你的场景较为简单,可以考虑更轻量的 PrefectDagster。选购搬瓦工 VPS 请参考 全部方案,购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣。

关于本站

搬瓦工VPS中文网(bwgvps.com)是非官方中文信息站,整理搬瓦工的方案、优惠和教程。我们不销售主机,不提供技术服务。

新手必读
搬瓦工优惠码

NODESEEK2026(优惠 6.77%)

购买时填入即可抵扣。