搬瓦工部署 Prefect 数据管道编排平台教程

Prefect 是新一代的工作流编排平台,被称为 Airflow 的现代替代品。相比 Airflow 复杂的 DAG 定义方式,Prefect 采用更加 Pythonic 的编程模型,只需要在普通 Python 函数上添加装饰器即可将其转变为可调度、可监控的工作流。本文将介绍如何在搬瓦工 VPS 上部署自托管的 Prefect Server。部署前请确保已安装 Docker 和 Docker Compose

一、Prefect 与 Airflow 的区别

  • 编程模型:Prefect 使用装饰器,Airflow 使用 Operator 类。Prefect 代码更简洁,学习曲线更低。
  • 动态工作流:Prefect 原生支持动态任务生成,Airflow 的 DAG 必须在解析时确定。
  • 重试机制:Prefect 内置细粒度的重试和缓存策略。
  • 部署方式:Prefect 支持本地执行、Docker 容器和 Kubernetes,更加灵活。
  • 资源消耗:Prefect Server 比 Airflow 轻量得多,适合中小规模场景。

二、系统要求

  • 操作系统:Ubuntu 20.04+ 或 Debian 11+。
  • 内存:至少 2GB,推荐 4GB。
  • 磁盘:至少 10GB 可用空间。
  • Docker:已安装 Docker 和 Docker Compose。

搬瓦工 VPS 方案选择请参考 全部方案,购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣,购买入口:bwh81.net

三、创建项目目录

mkdir -p /opt/prefect/flows && cd /opt/prefect

四、编写 Docker Compose 配置

cat > /opt/prefect/docker-compose.yml <<'EOF'
version: '3.8'

services:
  prefect-db:
    image: postgres:15
    restart: always
    environment:
      POSTGRES_DB: prefect
      POSTGRES_USER: prefect
      POSTGRES_PASSWORD: your_db_password_here
    volumes:
      - prefect_db_data:/var/lib/postgresql/data

  prefect-server:
    image: prefecthq/prefect:2-python3.11
    restart: always
    command: prefect server start --host 0.0.0.0
    ports:
      - "4200:4200"
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:your_db_password_here@prefect-db:5432/prefect
      PREFECT_SERVER_API_HOST: 0.0.0.0
      PREFECT_SERVER_API_PORT: 4200
    depends_on:
      - prefect-db

  prefect-worker:
    image: prefecthq/prefect:2-python3.11
    restart: always
    command: prefect worker start --pool default-agent-pool --type process
    environment:
      PREFECT_API_URL: http://prefect-server:4200/api
    depends_on:
      - prefect-server
    volumes:
      - ./flows:/opt/prefect/flows

volumes:
  prefect_db_data:
EOF

五、启动服务

cd /opt/prefect
docker compose up -d

验证服务状态:

docker compose ps
docker compose logs prefect-server

访问 http://your_server_ip:4200 即可看到 Prefect 的 Web UI。

六、编写第一个 Flow

flows 目录下创建示例 Flow:

cat > /opt/prefect/flows/example_flow.py <<'EOF'
from prefect import flow, task
from prefect.logging import get_run_logger
import httpx
from datetime import datetime

@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
    logger = get_run_logger()
    response = httpx.get(url)
    logger.info(f"Fetched data from {url}, status: {response.status_code}")
    return response.json()

@task
def process_data(data: dict) -> str:
    logger = get_run_logger()
    result = f"Processed at {datetime.now()}: {len(data)} items"
    logger.info(result)
    return result

@flow(name="example-data-pipeline")
def data_pipeline():
    data = fetch_data("https://jsonplaceholder.typicode.com/posts")
    result = process_data(data)
    return result

if __name__ == "__main__":
    data_pipeline()
EOF

七、部署和调度 Flow

进入 Worker 容器执行部署:

# 进入容器
docker compose exec prefect-worker bash

# 部署 Flow(设置 Cron 调度)
cd /opt/prefect/flows
python -c "
from example_flow import data_pipeline
data_pipeline.serve(
    name='daily-data-pipeline',
    cron='0 8 * * *'
)
"

八、配置反向代理

cat > /etc/nginx/sites-available/prefect <<'EOF'
server {
    listen 80;
    server_name your_domain.com;

    location / {
        proxy_pass http://127.0.0.1:4200;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}
EOF

ln -s /etc/nginx/sites-available/prefect /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
certbot --nginx -d your_domain.com

九、监控和日志

Prefect Web UI 提供丰富的监控功能:

  • Flow Runs:查看所有 Flow 的执行历史和状态。
  • Task Runs:追踪每个 Task 的执行时间和结果。
  • Logs:实时查看运行日志。
  • Notifications:配置失败通知到 Slack 或邮件。

十、常见问题

Worker 无法连接 Server

确保 PREFECT_API_URL 配置正确。如果使用域名,确保 DNS 已正确解析。

Flow 执行超时

在 Flow 或 Task 装饰器中设置超时时间:

@task(timeout_seconds=300)
def long_running_task():
    pass

总结

Prefect 是 Airflow 的优秀替代品,编程模型更现代,部署和维护也更简单。对于中小规模的数据管道编排需求,Prefect 是非常好的选择。如果你需要更传统的工作流调度工具,可以参考 Apache Airflow;如需面向数据资产的编排,参考 Dagster。选购搬瓦工 VPS 使用优惠码 NODESEEK2026 可享 6.77% 折扣。

关于本站

搬瓦工VPS中文网(bwgvps.com)是非官方中文信息站,整理搬瓦工的方案、优惠和教程。我们不销售主机,不提供技术服务。

新手必读
搬瓦工优惠码

NODESEEK2026(优惠 6.77%)

购买时填入即可抵扣。