AI ランタイムにデータを読み込む

Important

単一ノード タスクの AI ランタイムは パブリック プレビュー段階です。 マルチ GPU ワークロード用の分散トレーニング API は ベータ版のままです。

機械学習とディープ ラーニングワークロード用の AI ランタイムにトレーニング データを読み込みます。 すべてのデータ アクセスは Unity カタログを経由します。Spark Connect を使用して Delta テーブルから表形式のデータを読み取り、大きなデータセットとイメージ、オーディオ、テキストなどの非構造化ファイルの Unity カタログ ボリュームを読み取ります。 マルチエポック トレーニングの場合は、データをローカルにキャッシュして /tmp し、アクセスを高速化します。 Spark Python API を使用してデータを読み込んで変換する方法については、チュートリアルを参照してください。

Unity カタログが必要です。 AI ランタイムのすべてのデータ アクセスは、Unity カタログを経由します。 テーブルとボリュームは Unity カタログに登録され、ユーザーまたはサービス プリンシパルからアクセスできる必要があります。

表形式データを読み込む

Spark Connect を使用して 、Delta テーブルから表形式の機械学習データを読み込みます。

単一ノード トレーニングでは、PySpark メソッドを使用して Apache Spark DataFrames を pandas DataFrames に変換しtoPandas()必要に応じて PySpark メソッドを使用して NumPy 形式に変換できますto_numpy()

Spark Connect は、分析と名前解決を実行時間に遅延させ、コードの動作を変更する可能性があります。 Spark Connect と Spark クラシックの比較を参照してください。

Spark Connect では、Spark SQL、Spark 上の Pandas API、構造化ストリーミング、MLlib (DataFrame ベース) など、ほとんどの PySpark API がサポートされています。 サポートされている最新の API については、 PySpark API リファレンス ドキュメント を参照してください。

その他の制限事項については、「 サーバーレス コンピューティングの制限事項」を参照してください。

ボリュームを使用して大規模な Delta テーブルを読み込む

toPandas()で変換するには大きすぎる大きな差分テーブルの場合は、データを Unity カタログ ボリュームにエクスポートし、PyTorch または Hugging Face を使用して直接読み込みます。

# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

この方法では、トレーニング中の Spark のオーバーヘッドを回避し、単一 GPU と分散トレーニングの両方のワークフローに適しています。

UCVolumeDataset を使用してボリュームから非構造化データを読み込む

Unity カタログ ボリュームに格納されているイメージ、オーディオ、テキスト ファイルなどの非構造化データの場合は、UCVolumeDataset パッケージのserverless_gpu.dataを使用します。 UCVolumeDataset は、最初のアクセス時に各ファイルをボリュームから高速ローカル キャッシュにコピーし、キャッシュされたローカル ファイル パスを生成する PyTorch IterableDataset です。 これは、手動で実装するパフォーマンスとディストリビューションの問題を処理します。

  • ローカル キャッシュ。 ファイルは最初のアクセス時に FUSE マウントからローカル キャッシュ ディレクトリにコピーされ、その後キャッシュから提供されるため、マルチエポック トレーニングではボリュームが再読み取りされません。
  • パーティションの自動分割。 torch.distributedが初期化されると、ファイルはランク間でパーティション分割され、さらにDataLoaderワーカー間で分割されるため、各(rank, worker)ペアは、追加のセットアップなしで重複しないスライスを受け取ります。

UCVolumeDatasetおよびserverless_gpu.data.DataLoaderにはGPU 環境 5以降が必要です。

UCVolumeDataset では、生のローカル ファイル パスが生成されます。 これらのファイルをテンソルに変換するには、パス ストリームを受け取り、指定した解析ロジックを適用する 2 つ目の IterableDataset でラップします。 これにより、I/O と解析の問題が分離されます。

from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF

class ImageDataset(IterableDataset):
    """Decodes each cached file path from UCVolumeDataset into a tensor."""

    def __init__(self, path_dataset: UCVolumeDataset):
        self._path_dataset = path_dataset

    def __iter__(self):
        for local_path in self._path_dataset:
            image = Image.open(local_path).convert("RGB")
            yield TF.to_tensor(image)

path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)

ラッパーは既にキャッシュされているローカル パスを受け取るので、解析手順が FUSE マウントに触れることはありません。 拡張、トークン化、またはフィルター処理用の追加ラッパーを連結できます。

最適なパフォーマンスを得られるように、ストック PyTorch UCVolumeDatasetではなく、serverless_gpu.data.DataLoaderDataLoaderをペアにしてください。 これは、サーバーレス GPU I/O 用にチューニングされ、GPU コンピューティング中にファイルを同時にフェッチおよびキャッシュします。 データ読み込みのパフォーマンスを参照してください。

@distributed デコレーター内でデータを読み込む

分散トレーニングに サーバーレス GPU API を 使用する場合は、 @distributed デコレーター内でデータ読み込みコードを移動します。 データセットのサイズは pickle で許可されている最大サイズを超える可能性があるため、次に示すように、デコレーター内でデータセットを生成することをお勧めします。

from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
    # Load data inside the decorator to avoid pickle serialization issues
    dataset = get_dataset(file_path)
    ...

デコレーター内に UCVolumeDataset を構築すると、反復時に torch.distributed ランク情報が読み取られ、ファイルが複数のランクに自動的にパーティション分割されるため、ファイル ベースのボリューム データの DistributedSampler は必要ありません。

データ読み込みのパフォーマンス

/Workspace ディレクトリと /Volumes ディレクトリは、リモート Unity カタログ ストレージでホストされます。 データセットが Unity カタログに格納されている場合、データの読み込み速度は使用可能なネットワーク帯域幅によって制限されます。 複数のエポックをトレーニングする場合は、このキャッシュを行う UCVolumeDataset を使用することをお勧めします。最初のアクセス時に各ファイルをローカル ストレージにコピーし、その後のローカル コピーからの読み取りを行います。 ボリューム内のデータセットでは、手動の shutil.copytree よりもこちらを使うことを推奨します。というのも、手動の shutil.copytree では、トレーニングでその一部しか使用しない場合でも、最初にディレクトリツリー全体をコピーしてしまうためです。

データセットが大きい場合は、次の手法でスループットを向上させることができます。

  • serverless_gpu.data.DataLoaderを使用してフェッチを並列化します。 これは、サーバーレス GPU I/O 用にチューニングされた 、torch DataLoader のドロップイン サブクラスです。 num_workers の既定値は 6 で、 prefetch_factor は 4 (PyTorch の 0 と 2 と比較) であるため、GPU の計算中にファイルが同時にフェッチおよびキャッシュされます。 また、バッチごとのフェッチタイミングをアクティブな MLflow 実行にログに記録します。これにより、データ読み込みのボトルネックを特定するのに役立ちます。

    from serverless_gpu.data import DataLoader
    
    loader = DataLoader(
        dataset,
        batch_size=32,
        pin_memory=True,
        # num_workers=6, by default
        # prefetch_factor=4, by default
        # raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue.
    )
    

    num_workersUCVolumeDataset 個のスロットにまたがってグローバルストライドを使用してファイルを分割するため、すべてのランクで同じ world_size × num_workers 値を使用する必要があります。 値が一致しない場合、ファイルは重複またはスキップされます。

  • バッチ サイズを増やします。 バッチが大きいほど、より多くのサンプルに対するバッチごとのデータ読み込みオーバーヘッドが償却され、ステップごとのファイル フェッチ操作の数が減ります。 GPU メモリが制限要因である場合は、より大きなバッチ サイズとグラデーションの累積を組み合わせて、有効なバッチ サイズを維持します。

ストリーミング データセット

メモリに収まらない非常に大規模なデータセットの場合は、ストリーミング アプローチを使用します。