搬瓦工部署 Dagster 数据编排平台教程

Dagster 是一款面向数据资产(Software-defined Assets)的现代数据编排平台。与 Airflow 侧重于任务调度不同,Dagster 的核心理念是将数据管道中的每个产出物(表、文件、模型等)都视为一等公民,通过声明式的方式定义数据资产之间的依赖关系。本文将介绍如何在搬瓦工 VPS 上通过 Docker 部署 Dagster。部署前请确保已安装 Docker 和 Docker Compose

一、Dagster 核心概念

  • Asset(数据资产):管道中产出的数据对象,如数据库表、Parquet 文件或 ML 模型。
  • Op(操作):执行具体计算逻辑的函数单元。
  • Job:由多个 Op 组成的可执行管道。
  • Schedule:基于 Cron 表达式的定时调度。
  • Sensor:基于外部事件触发的调度,如文件变更或 API 回调。
  • Resource:外部服务的连接配置,如数据库连接、API 客户端等。
  • IO Manager:管理数据资产的读写逻辑。

二、系统要求

  • 操作系统:Ubuntu 20.04+ 或 Debian 11+。
  • 内存:至少 2GB,推荐 4GB。
  • 磁盘:至少 10GB 可用空间。
  • Docker:已安装 Docker 和 Docker Compose。

选购搬瓦工 VPS 请参考 全部方案,购买时使用优惠码 NODESEEK2026 可享 6.77% 折扣,购买入口:bwh81.net

三、创建项目结构

mkdir -p /opt/dagster/{dagster_home,user_code}
cd /opt/dagster

创建 Dagster 配置文件:

cat > /opt/dagster/dagster_home/dagster.yaml <<'EOF'
scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_launcher:
  module: dagster.core.launcher
  class: DefaultRunLauncher

storage:
  postgres:
    postgres_db:
      hostname: dagster-db
      username: dagster
      password: your_db_password_here
      db_name: dagster

telemetry:
  enabled: false
EOF

四、创建示例数据资产

cat > /opt/dagster/user_code/definitions.py <<'EOF'
from dagster import (
    asset, Definitions, ScheduleDefinition,
    define_asset_job, AssetSelection
)
import json
from datetime import datetime

@asset(description="获取原始数据")
def raw_data():
    data = {
        "timestamp": datetime.now().isoformat(),
        "records": [
            {"id": i, "value": i * 10}
            for i in range(100)
        ]
    }
    return data

@asset(description="清洗后的数据")
def cleaned_data(raw_data):
    records = raw_data["records"]
    cleaned = [r for r in records if r["value"] > 50]
    return {"count": len(cleaned), "data": cleaned}

@asset(description="数据分析报告")
def analysis_report(cleaned_data):
    count = cleaned_data["count"]
    values = [r["value"] for r in cleaned_data["data"]]
    return {
        "total_records": count,
        "avg_value": sum(values) / len(values) if values else 0,
        "max_value": max(values) if values else 0,
        "generated_at": datetime.now().isoformat()
    }

daily_job = define_asset_job(
    "daily_pipeline",
    selection=AssetSelection.all()
)

daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule="0 6 * * *",
)

defs = Definitions(
    assets=[raw_data, cleaned_data, analysis_report],
    jobs=[daily_job],
    schedules=[daily_schedule],
)
EOF

五、编写 Docker Compose 配置

cat > /opt/dagster/docker-compose.yml <<'EOF'
version: '3.8'

services:
  dagster-db:
    image: postgres:15
    restart: always
    environment:
      POSTGRES_DB: dagster
      POSTGRES_USER: dagster
      POSTGRES_PASSWORD: your_db_password_here
    volumes:
      - dagster_db_data:/var/lib/postgresql/data

  dagster-webserver:
    image: dagster/dagster-k8s:latest
    restart: always
    command: dagster-webserver -h 0.0.0.0 -p 3000 -w workspace.yaml
    ports:
      - "3000:3000"
    environment:
      DAGSTER_HOME: /opt/dagster/dagster_home
    volumes:
      - ./dagster_home:/opt/dagster/dagster_home
      - ./user_code:/opt/dagster/user_code
    depends_on:
      - dagster-db

  dagster-daemon:
    image: dagster/dagster-k8s:latest
    restart: always
    command: dagster-daemon run
    environment:
      DAGSTER_HOME: /opt/dagster/dagster_home
    volumes:
      - ./dagster_home:/opt/dagster/dagster_home
      - ./user_code:/opt/dagster/user_code
    depends_on:
      - dagster-db

volumes:
  dagster_db_data:
EOF

创建 workspace 配置:

cat > /opt/dagster/workspace.yaml <<'EOF'
load_from:
  - python_file:
      relative_path: user_code/definitions.py
      working_directory: /opt/dagster
EOF

六、启动服务

cd /opt/dagster
docker compose up -d

查看日志确认启动成功:

docker compose logs -f dagster-webserver

访问 http://your_server_ip:3000 进入 Dagit(Dagster 的 Web UI)。在 Assets 页面可以看到已定义的数据资产及其依赖关系图。

七、配置反向代理

cat > /etc/nginx/sites-available/dagster <<'EOF'
server {
    listen 80;
    server_name your_domain.com;

    location / {
        proxy_pass http://127.0.0.1:3000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}
EOF

ln -s /etc/nginx/sites-available/dagster /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
certbot --nginx -d your_domain.com

八、使用 Dagit 界面

  • Asset Graph:可视化展示数据资产之间的依赖关系。
  • Materialize:手动触发数据资产的物化(执行)。
  • Runs:查看所有执行记录和日志。
  • Schedules:管理定时调度任务。
  • Sensors:管理基于事件的触发器。

九、常见问题

Webserver 启动失败

检查数据库连接配置和 dagster.yaml 文件格式:

docker compose logs dagster-webserver

Daemon 未执行调度任务

确保 Daemon 进程正常运行,并在 Dagit 界面中确认 Schedule 已开启。

总结

Dagster 以数据资产为中心的编排理念非常适合现代数据团队,特别是在需要管理复杂数据血缘关系的场景中。如果你更偏好传统的任务调度模式,可以参考 Apache AirflowPrefect。选购搬瓦工 VPS 使用优惠码 NODESEEK2026 可享 6.77% 折扣。

关于本站

搬瓦工VPS中文网(bwgvps.com)是非官方中文信息站,整理搬瓦工的方案、优惠和教程。我们不销售主机,不提供技术服务。

新手必读
搬瓦工优惠码

NODESEEK2026(优惠 6.77%)

购买时填入即可抵扣。