Treino distribuído com Ray Train

Important

Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Visualizações . Ver Gerir as pré-visualizações de Azure Databricks.

Este exemplo executa afinação fina distribuída com paralelismo de dados com o TorchTrainer do Ray Train em 8 GPUs H100 num único nó. Um script de arranque inicia um cluster Ray no nó, depois o maquinista do Ray Train lança um trabalhador por GPU, envolve o modelo em DDP e fragmenta automaticamente o conjunto de dados entre os trabalhadores.

Afina um modelo público (Qwen2.5-3B), por isso funciona tal como está sem um token da Hugging Face.

A carga de trabalho faz o seguinte:

  • Faz o carregamento do projeto local com code_source: snapshot.
  • Inicia um nó principal do Ray com as 8 GPUs e, em seguida, executa o controlador do Ray Train.
  • Utiliza ray.train.torch.prepare_model e prepare_data_loader para tratar do encapsulamento DDP, colocação de dispositivos e amostragem distribuída.
  • Regista métricas para MLflow.

Pré-requisitos

Estrutura do projeto

Crie um diretório com os seguintes ficheiros.

ray_train_distributed/
├── train.yaml          # air workload config (inline dependencies + Ray bootstrap)
└── train_ray.py        # Ray Train TorchTrainer driver + per-worker training

Passo 1: Escrever a carga de trabalho em YAML

train.yaml solicita um único GPU_8xH100 nó. As dependências são declaradas em linha sob environment (com a imagem de cliente version), e o command inicia um cluster Ray no nó e depois executa o controlador, pelo que a carga de trabalho não necessita de um ficheiro de dependências separado nem de um script de arranque:

experiment_name: air-ray-train-distributed

environment:
  version: '4'
  dependencies:
    - ray[default,train]>=2.30
    - transformers>=4.45
    - datasets>=3.0
    - huggingface_hub>=0.34
    # The base image ships fsspec 2023.5.0, which is too old for modern
    # huggingface_hub and breaks dataset/model downloads. Pin a newer fsspec.
    - fsspec>=2024.6.1

# 8 H100 on a single node. Ray Train launches one worker per GPU.
compute:
  num_accelerators: 8
  accelerator_type: GPU_8xH100

code_source:
  type: snapshot
  snapshot:
    root_path: .

command: |
  cd $CODE_SOURCE_PATH
  RAY_HEAD_PORT=6379
  GPUS_PER_NODE=${LOCAL_WORLD_SIZE:-8}
  if [ "${NODE_RANK:-0}" = "0" ]; then
    echo "NODE_RANK=0: starting Ray head with $GPUS_PER_NODE GPU(s)..."
    ray start --head --port=$RAY_HEAD_PORT --num-gpus="$GPUS_PER_NODE" --dashboard-host=0.0.0.0
    python train_ray.py
    ray stop
  else
    echo "NODE_RANK=$NODE_RANK: connecting to Ray head at $MASTER_ADDR:$RAY_HEAD_PORT..."
    for i in $(seq 1 12); do
      if ray start --address="$MASTER_ADDR:$RAY_HEAD_PORT" --num-gpus="$GPUS_PER_NODE" --block 2>/dev/null; then
        break
      fi
      echo "Attempt $i failed, retrying in 5s..."
      sleep 5
    done
  fi

max_retries: 0
timeout_minutes: 90
env_variables:
  NCCL_SOCKET_IFNAME: eth0

O inline command inicia uma cabeça Ray com todas as GPUs no nó, executa o driver com python train_ray.py, e depois para o cluster. Inclui também uma ramificação de processamento que se associa ao nó principal, pelo que o mesmo comando continua a funcionar se dimensionar a tarefa para vários nós.

Passo 2: Defina o maquinista do Ray Train

train_ray.py define a train_func que corre em todos os trabalhadores e a main que configura o TorchTrainer para usar todas as GPUs do cluster. prepare_model Envolve o modelo em DDP e move-o para a GPU do trabalhador. prepare_data_loader adiciona um sampler distribuído:

def train_func(config: dict):
    model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16)
    model.config.use_cache = False
    model = prepare_model(model)              # DDP wrap + device placement

    loader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True, drop_last=True)
    loader = prepare_data_loader(loader)      # distributed sampler + GPU transfer
    optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])
    ...
    ray.train.report({"loss": out.loss.item(), "step": step})


def main():
    ray.init(address="auto")
    total_gpus = int(ray.cluster_resources().get("GPU", 0))
    trainer = TorchTrainer(
        train_func,
        train_loop_config={"lr": 2e-5, "batch_size": 4, "max_steps": 100},
        scaling_config=ScalingConfig(num_workers=total_gpus, use_gpu=True),
    )
    trainer.fit()

O guião completo está listado no script de treino completo no final desta página.

Passo 3: Submeter a execução

air run -f train.yaml --dry-run
air run -f train.yaml --watch

Passo 4: Inspecionar a pista

air get run <run-id>
air logs <run-id>

O nó principal do Ray e o driver são ambos executados no nó 0, pelo que os registos são transmitidos de um único nó.

Onde os resultados aparecem

Métricas reportadas com ray.train.report e registadas com MLflow aparecem no experimento MLflow nomeado em experiment_name, visíveis na interface do workspace MLflow.

Roteiro de treino completo

A versão completa train_ray.py para copiar e colar:

#!/usr/bin/env python3
"""Distributed data-parallel fine-tuning with Ray Train on a single 8x H100 node.

The workload `command` starts a Ray head with 8 GPUs and runs this script. Ray Train's
TorchTrainer launches one worker per GPU (8 total), wraps the model in DDP, shards
the dataset across workers, and aggregates metrics. Each worker runs `train_func`.

Uses a public model (no Hugging Face token required) so the example runs as-is.
"""

import os

import mlflow
import ray
import ray.train
import torch
from datasets import load_dataset
from ray.train import RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer, prepare_data_loader, prepare_model
from torch.utils.data import DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer

MODEL_NAME = "Qwen/Qwen2.5-3B"
DATASET_NAME = "tatsu-lab/alpaca"
MAX_SEQ_LEN = 1024


def build_dataset(tokenizer):
    raw = load_dataset(DATASET_NAME, split="train[:8000]")

    def format_example(row):
        prompt = f"### Instruction:\n{row['instruction']}\n\n"
        if row.get("input"):
            prompt += f"### Input:\n{row['input']}\n\n"
        text = f"{prompt}### Response:\n{row['output']}{tokenizer.eos_token}"
        out = tokenizer(text, truncation=True, max_length=MAX_SEQ_LEN, padding="max_length")
        out["labels"] = out["input_ids"].copy()
        return out

    return raw.map(format_example, remove_columns=raw.column_names)


def train_func(config: dict):
    """Runs on every Ray Train worker (one per GPU)."""
    rank = ray.train.get_context().get_world_rank()

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16)
    model.config.use_cache = False
    # prepare_model moves the model to this worker's GPU and wraps it in DDP.
    model = prepare_model(model)

    dataset = build_dataset(tokenizer).with_format("torch")
    loader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True, drop_last=True)
    # prepare_data_loader injects a DistributedSampler and moves batches to the GPU.
    loader = prepare_data_loader(loader)

    optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])

    # AI Runtime injects MLFLOW_RUN_ID and configures the databricks tracking URI on
    # the node, so logging works without DATABRICKS_HOST/TOKEN. Gate on MLFLOW_RUN_ID
    # so the script also runs cleanly off-platform (e.g. locally) where it is unset.
    use_mlflow = rank == 0 and bool(os.environ.get("MLFLOW_RUN_ID"))
    if use_mlflow:
        mlflow.start_run(run_id=os.environ.get("MLFLOW_RUN_ID"))
        mlflow.log_params({"model": MODEL_NAME, "lr": config["lr"], "batch_size": config["batch_size"]})

    model.train()
    step = 0
    for batch in loader:
        out = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["labels"],
        )
        out.loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        optimizer.zero_grad()
        step += 1

        ray.train.report({"loss": out.loss.item(), "step": step})
        if use_mlflow:
            mlflow.log_metric("train_loss", out.loss.item(), step=step)
        if step >= config["max_steps"]:
            break

    if use_mlflow:
        mlflow.end_run()


def main():
    ray.init(address="auto")
    total_gpus = int(ray.cluster_resources().get("GPU", 0))
    print(f"Ray cluster ready: {total_gpus} GPU(s)", flush=True)

    trainer = TorchTrainer(
        train_func,
        train_loop_config={"lr": 2e-5, "batch_size": 4, "max_steps": 100},
        scaling_config=ScalingConfig(num_workers=total_gpus, use_gpu=True),
        run_config=RunConfig(storage_path="/tmp/ray_results", name="qwen-sft"),
    )
    result = trainer.fit()
    print(f"Training finished. Final metrics: {result.metrics}", flush=True)

    ray.shutdown()


if __name__ == "__main__":
    main()

Recursos adicionais