搬瓦工部署 Dagster 数据编排平台教程
Dagster 是一款面向数据资产(Software-defined Assets)的现代数据编排平台。与 Airflow 侧重于任务调度不同,Dagster 的核心理念是将数据管道中的每个产出物(表、文件、模型等)都视为一等公民,通过声明式的方式定义数据资产之间的依赖关系。本文将介绍如何在搬瓦工 VPS 上通过 Docker 部署 Dagster。部署前请确保已安装 Docker 和 Docker Compose。
一、Dagster 核心概念
- Asset(数据资产):管道中产出的数据对象,如数据库表、Parquet 文件或 ML 模型。
- Op(操作):执行具体计算逻辑的函数单元。
- Job:由多个 Op 组成的可执行管道。
- Schedule:基于 Cron 表达式的定时调度。
- Sensor:基于外部事件触发的调度,如文件变更或 API 回调。
- Resource:外部服务的连接配置,如数据库连接、API 客户端等。
- IO Manager:管理数据资产的读写逻辑。
二、系统要求
- 操作系统:Ubuntu 20.04+ 或 Debian 11+。
- 内存:至少 2GB,推荐 4GB。
- 磁盘:至少 10GB 可用空间。
- Docker:已安装 Docker 和 Docker Compose。
选购搬瓦工 VPS 请参考 全部方案,购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣,购买入口:bwh81.net。
三、创建项目结构
mkdir -p /opt/dagster/{dagster_home,user_code}
cd /opt/dagster
创建 Dagster 配置文件:
cat > /opt/dagster/dagster_home/dagster.yaml <<'EOF'
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
storage:
postgres:
postgres_db:
hostname: dagster-db
username: dagster
password: your_db_password_here
db_name: dagster
telemetry:
enabled: false
EOF
四、创建示例数据资产
cat > /opt/dagster/user_code/definitions.py <<'EOF'
from dagster import (
asset, Definitions, ScheduleDefinition,
define_asset_job, AssetSelection
)
import json
from datetime import datetime
@asset(description="获取原始数据")
def raw_data():
data = {
"timestamp": datetime.now().isoformat(),
"records": [
{"id": i, "value": i * 10}
for i in range(100)
]
}
return data
@asset(description="清洗后的数据")
def cleaned_data(raw_data):
records = raw_data["records"]
cleaned = [r for r in records if r["value"] > 50]
return {"count": len(cleaned), "data": cleaned}
@asset(description="数据分析报告")
def analysis_report(cleaned_data):
count = cleaned_data["count"]
values = [r["value"] for r in cleaned_data["data"]]
return {
"total_records": count,
"avg_value": sum(values) / len(values) if values else 0,
"max_value": max(values) if values else 0,
"generated_at": datetime.now().isoformat()
}
daily_job = define_asset_job(
"daily_pipeline",
selection=AssetSelection.all()
)
daily_schedule = ScheduleDefinition(
job=daily_job,
cron_schedule="0 6 * * *",
)
defs = Definitions(
assets=[raw_data, cleaned_data, analysis_report],
jobs=[daily_job],
schedules=[daily_schedule],
)
EOF
五、编写 Docker Compose 配置
cat > /opt/dagster/docker-compose.yml <<'EOF'
version: '3.8'
services:
dagster-db:
image: postgres:15
restart: always
environment:
POSTGRES_DB: dagster
POSTGRES_USER: dagster
POSTGRES_PASSWORD: your_db_password_here
volumes:
- dagster_db_data:/var/lib/postgresql/data
dagster-webserver:
image: dagster/dagster-k8s:latest
restart: always
command: dagster-webserver -h 0.0.0.0 -p 3000 -w workspace.yaml
ports:
- "3000:3000"
environment:
DAGSTER_HOME: /opt/dagster/dagster_home
volumes:
- ./dagster_home:/opt/dagster/dagster_home
- ./user_code:/opt/dagster/user_code
depends_on:
- dagster-db
dagster-daemon:
image: dagster/dagster-k8s:latest
restart: always
command: dagster-daemon run
environment:
DAGSTER_HOME: /opt/dagster/dagster_home
volumes:
- ./dagster_home:/opt/dagster/dagster_home
- ./user_code:/opt/dagster/user_code
depends_on:
- dagster-db
volumes:
dagster_db_data:
EOF
创建 workspace 配置:
cat > /opt/dagster/workspace.yaml <<'EOF'
load_from:
- python_file:
relative_path: user_code/definitions.py
working_directory: /opt/dagster
EOF
六、启动服务
cd /opt/dagster
docker compose up -d
查看日志确认启动成功:
docker compose logs -f dagster-webserver
访问 http://your_server_ip:3000 进入 Dagit(Dagster 的 Web UI)。在 Assets 页面可以看到已定义的数据资产及其依赖关系图。
七、配置反向代理
cat > /etc/nginx/sites-available/dagster <<'EOF'
server {
listen 80;
server_name your_domain.com;
location / {
proxy_pass http://127.0.0.1:3000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
EOF
ln -s /etc/nginx/sites-available/dagster /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
certbot --nginx -d your_domain.com
八、使用 Dagit 界面
- Asset Graph:可视化展示数据资产之间的依赖关系。
- Materialize:手动触发数据资产的物化(执行)。
- Runs:查看所有执行记录和日志。
- Schedules:管理定时调度任务。
- Sensors:管理基于事件的触发器。
九、常见问题
Webserver 启动失败
检查数据库连接配置和 dagster.yaml 文件格式:
docker compose logs dagster-webserver
Daemon 未执行调度任务
确保 Daemon 进程正常运行,并在 Dagit 界面中确认 Schedule 已开启。
总结
Dagster 以数据资产为中心的编排理念非常适合现代数据团队,特别是在需要管理复杂数据血缘关系的场景中。如果你更偏好传统的任务调度模式,可以参考 Apache Airflow 或 Prefect。选购搬瓦工 VPS 使用优惠码 NODESEEK2026 可享 6.77% 折扣。