多くのお客様は、同じAzure Databricks クラスターで複数の構造化ストリーミング クエリを実行します。 このパターンはサポートされていますが、スケーリングの問題やパフォーマンスのボトルネックを回避するために、Databricks ではクラスターあたりのクエリ数を制限することをお勧めします。 サーバーレス コンピューティングでは、Azure Databricksはスケーリングを自動的に管理するため、これらの考慮事項は自動的に処理されます。 ドライバーと Executor のサイズを制御する従来のコンピューティングを使用している場合、このページでは、留意すべき重要なボトルネックとその対処方法について説明します。
Note
Databricks では、インフラストラクチャの複雑さを自動的に管理する新しいストリーミング ワークロードに Lakeflow Spark 宣言パイプラインを使用することをお勧めします。 「Lakeflow Spark 宣言型パイプライン」を参照してください。
同じクラスターで複数のクエリを使用する場合
同じクラスターで複数のストリーミング クエリを実行すると、インフラストラクチャ コストが削減されます。特に、それぞれが専用のコンピューティングを必要としない多数の小さなストリームがある場合です。 重要なトレードオフは共有エラーです。クラスターで障害が発生すると、その上のすべてのストリームが失敗します。 ミッション クリティカルなパイプラインの場合、その共有障害モードは、多くの場合許容できません。
クリティカル ストリームと非クリティカル ストリームを混在するワークロードの場合、Databricks では次のことが推奨されます。
- ビジネスへの影響に基づいて、各ストリームに優先順位を割り当てます。
- より高いコストでも、ミッション クリティカルなストリームを専用クラスターに配置します。
- 優先順位の低いストリームを併配置してコンピューティングを共有し、コストを削減します。
ドライバーのサイジング
ドライバーは共有リソースです。 複数のクエリで、同じ CPU、メモリ、DAG スケジューラ、タスク スケジューラ、およびドライバー側 UDF の実行 (たとえば、 foreachBatch) が共有されます。 多数の同時実行ストリームを実行する場合は、標準の CPU とメモリのプロビジョニング以外の特定のボトルネックを監視します。
- 自動ローダーのオーバーヘッド: ストリームで自動ローダーを使用する場合、ファイルの検出とディレクトリの一覧によってドライバーの負荷が高くなります。
-
OS レベルのリソース制限 (開いているファイル): 1 つのドライバーで大量のファイル ベースのストリーム (
FileStreamSourceや自動ローダーなど) を同時に実行すると、ユーザー レベルのオープン ファイル記述子の制限が使い果たされ、ランダム ストリームエラーが発生する可能性があります。 -
リスナー バスのバックプレッシャ: 同時ストリーミング クエリの数が多い場合、単一の Spark セッションの
StreamingQueryListenerバスでバックプレッシャが発生する可能性があります。 すべてのイベント (onQueryIdleを含む) がこの 1 つのバスに送信され、大規模なイベント バックログによって非同期onQueryProgressハンドラーが大幅に遅延し、クラスターの安定性に影響を与える可能性があります。 -
コストの高いドライバー操作: 大きな結果セットを具体化し、メモリ不足 (OOM) エラーを引き起こすのを避けるために、絶対に必要でない限り、ドライバーで
collect()またはその他の高価な DataFrame 操作を呼び出さないようにします。
ドライバー競合のトラブルシューティング
OOM または競合の問題が原因でドライバーがクラッシュする場合:
- Spark UI でドライバーメトリックを監視します。 CPU、メモリ、またはディスクの使用率が高い場合は、クラスターのコンピューティング設定でドライバーのサイズを調整します。
- 問題が解決しない場合は、コードでメモリ集中型の操作または UDF がドライバーで実行されていないことを確認します。
- これ以上ドライバーを垂直方向にスケーリングできない場合、Databricks では、これらの共有ノードスケーリングのボトルネックを回避するために、複数のクラスターにジョブを分割することを強くお勧めします。
Executor のサイズ設定
同じクラスターで複数のクエリが実行されている場合、すべてのクエリが Executor のタスク スロットを共有します。 1 つのクエリのステージが利用可能なスロットを占有し、他のクエリで遅延や飢餓状態を招く可能性があります。 Spark では、タスク スロットと使用可能なコアの間で 1 対 1 のマッピングが使用されます。 クエリを同時に実行する必要がある場合は、十分なコアが使用可能であることを確認します。
一般に、Executor は、ドライバー ノードよりも多くのメモリ負荷の高い操作を実行する可能性があります。 アプリケーションの負荷を処理するために必要な場合は、Executor JVM とオフヒープ メモリ割り当てパラメーターを調整します。 Executor ノードのサイズが CPU、メモリ、ディスク領域の観点から適切に設定されていることを確認し、必要に応じて垂直方向にスケーリングします。 垂直方向のスケーリングが不可能な場合は、クラスターにワーカー ノードを追加することを検討してください。
Note
これらの変更の一部では、クラスターを再起動して有効にする必要がある場合があります。
スケジューラプールを使用する
同じソース コードから複数のストリーミング クエリを実行するときに、クエリにコンピューティング容量を割り当てるスケジューラ プールを構成できます。
既定では、ノートブックで開始されたクエリはすべて、同じ公平なスケジュール プールで実行されます。 ノートブック内のすべてのストリーミング クエリのトリガーによって生成された Apache Spark ジョブは、"先入れ先出し" (FIFO) 順に順番に実行されます。 これにより、クラスター リソースが効率的に共有されないため、クエリに不必要な遅延が発生する可能性があります。
スケジューラ プールを使用すると、コンピューティング リソースを共有する構造化ストリーミング クエリを宣言できます。
次の例では、専用プールに query1 を割り当てますが、 query2 と query3 はスケジューラ プールを共有します。
:::note サーバーレスの互換性
Databricks は、Databricks サーバーレス コンピューティング アーキテクチャと互換性がないため、 spark.sparkContext から移行することをお勧めします。 代わりに、 spark (SparkSession) を直接使用してください。 スケジューラ プールはクラシック コンピューティングの概念です。サーバーレスでは、Databricks によってスケーリングとリソースの割り当てが自動的に管理されます。
:::
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Note
ローカル プロパティの構成は、ストリーミング クエリを開始するノートブック セルと同じである必要があります。
フェア スケジューラ プールの詳細については、 Apache Spark Fair Scheduler のドキュメントを参照してください。
ステートフル クエリに関する考慮事項
同じクラスターで実行されているステートフル クエリの場合は、次の点に注意してください。
- OOM の問題や GC の一時停止を回避するには、状態ストア プロバイダーとして RocksDB を使用します。 RocksDB は、Databricks Runtime 17.3 以降の既定の状態ストア プロバイダーです。 「Azure Databricks で RocksDB 状態ストアを構成する」をご覧ください。
- アプリケーションの要件に合わせてシャッフル パーティションを調整します。 ステートフル ステージの場合、Spark はシャッフル パーティションの数に比例してタスクをスケジュールします。
- ノードごとに RocksDB のメモリ使用量を上限にして、ヒープ外のメモリ使用量による OOM エラーを回避します。 これは Databricks Runtime 17.3 以降では自動的に処理されますが、以前のリリースでは手動で構成する必要があります。 Cap RocksDB のメモリ使用量を参照してください。
- 同じ Executor ノードにパックするパーティションが多くなりすぎないようにします。 スナップショットのアップロードやクリーンアップなど、状態ストアのメンテナンス操作はノードごとに実行されます。 1 つの Executor ノードに割り当てるパーティションが多すぎると、完全なスナップショットが少なくなっているため、メンテナンスが不足し、復旧時間が長くなる可能性があります。