搬瓦工部署 Prefect 数据管道编排平台教程
Prefect 是新一代的工作流编排平台,被称为 Airflow 的现代替代品。相比 Airflow 复杂的 DAG 定义方式,Prefect 采用更加 Pythonic 的编程模型,只需要在普通 Python 函数上添加装饰器即可将其转变为可调度、可监控的工作流。本文将介绍如何在搬瓦工 VPS 上部署自托管的 Prefect Server。部署前请确保已安装 Docker 和 Docker Compose。
一、Prefect 与 Airflow 的区别
- 编程模型:Prefect 使用装饰器,Airflow 使用 Operator 类。Prefect 代码更简洁,学习曲线更低。
- 动态工作流:Prefect 原生支持动态任务生成,Airflow 的 DAG 必须在解析时确定。
- 重试机制:Prefect 内置细粒度的重试和缓存策略。
- 部署方式:Prefect 支持本地执行、Docker 容器和 Kubernetes,更加灵活。
- 资源消耗:Prefect Server 比 Airflow 轻量得多,适合中小规模场景。
二、系统要求
- 操作系统:Ubuntu 20.04+ 或 Debian 11+。
- 内存:至少 2GB,推荐 4GB。
- 磁盘:至少 10GB 可用空间。
- Docker:已安装 Docker 和 Docker Compose。
搬瓦工 VPS 方案选择请参考 全部方案,购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣,购买入口:bwh81.net。
三、创建项目目录
mkdir -p /opt/prefect/flows && cd /opt/prefect
四、编写 Docker Compose 配置
cat > /opt/prefect/docker-compose.yml <<'EOF'
version: '3.8'
services:
prefect-db:
image: postgres:15
restart: always
environment:
POSTGRES_DB: prefect
POSTGRES_USER: prefect
POSTGRES_PASSWORD: your_db_password_here
volumes:
- prefect_db_data:/var/lib/postgresql/data
prefect-server:
image: prefecthq/prefect:2-python3.11
restart: always
command: prefect server start --host 0.0.0.0
ports:
- "4200:4200"
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:your_db_password_here@prefect-db:5432/prefect
PREFECT_SERVER_API_HOST: 0.0.0.0
PREFECT_SERVER_API_PORT: 4200
depends_on:
- prefect-db
prefect-worker:
image: prefecthq/prefect:2-python3.11
restart: always
command: prefect worker start --pool default-agent-pool --type process
environment:
PREFECT_API_URL: http://prefect-server:4200/api
depends_on:
- prefect-server
volumes:
- ./flows:/opt/prefect/flows
volumes:
prefect_db_data:
EOF
五、启动服务
cd /opt/prefect
docker compose up -d
验证服务状态:
docker compose ps
docker compose logs prefect-server
访问 http://your_server_ip:4200 即可看到 Prefect 的 Web UI。
六、编写第一个 Flow
在 flows 目录下创建示例 Flow:
cat > /opt/prefect/flows/example_flow.py <<'EOF'
from prefect import flow, task
from prefect.logging import get_run_logger
import httpx
from datetime import datetime
@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
logger = get_run_logger()
response = httpx.get(url)
logger.info(f"Fetched data from {url}, status: {response.status_code}")
return response.json()
@task
def process_data(data: dict) -> str:
logger = get_run_logger()
result = f"Processed at {datetime.now()}: {len(data)} items"
logger.info(result)
return result
@flow(name="example-data-pipeline")
def data_pipeline():
data = fetch_data("https://jsonplaceholder.typicode.com/posts")
result = process_data(data)
return result
if __name__ == "__main__":
data_pipeline()
EOF
七、部署和调度 Flow
进入 Worker 容器执行部署:
# 进入容器
docker compose exec prefect-worker bash
# 部署 Flow(设置 Cron 调度)
cd /opt/prefect/flows
python -c "
from example_flow import data_pipeline
data_pipeline.serve(
name='daily-data-pipeline',
cron='0 8 * * *'
)
"
八、配置反向代理
cat > /etc/nginx/sites-available/prefect <<'EOF'
server {
listen 80;
server_name your_domain.com;
location / {
proxy_pass http://127.0.0.1:4200;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
EOF
ln -s /etc/nginx/sites-available/prefect /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
certbot --nginx -d your_domain.com
九、监控和日志
Prefect Web UI 提供丰富的监控功能:
- Flow Runs:查看所有 Flow 的执行历史和状态。
- Task Runs:追踪每个 Task 的执行时间和结果。
- Logs:实时查看运行日志。
- Notifications:配置失败通知到 Slack 或邮件。
十、常见问题
Worker 无法连接 Server
确保 PREFECT_API_URL 配置正确。如果使用域名,确保 DNS 已正确解析。
Flow 执行超时
在 Flow 或 Task 装饰器中设置超时时间:
@task(timeout_seconds=300)
def long_running_task():
pass
总结
Prefect 是 Airflow 的优秀替代品,编程模型更现代,部署和维护也更简单。对于中小规模的数据管道编排需求,Prefect 是非常好的选择。如果你需要更传统的工作流调度工具,可以参考 Apache Airflow;如需面向数据资产的编排,参考 Dagster。选购搬瓦工 VPS 使用优惠码 NODESEEK2026 可享 6.77% 折扣。