foreach_batch_sink

@dp.foreach_batch_sink() デコレーターは、カスタム ロジックを使用してPythonで処理する一連のマイクロ バッチとしてストリームを処理する ForEachBatch シンクを定義します。 変換されたデータを書き込む追加フローでシンクをtargetとして参照します。 概念ガイダンス、考慮事項、および例については、「 ForEachBatch を使用してパイプライン内の任意のデータ シンクに書き込む」を参照してください。

Syntax

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.

Parameters

パラメーター Description
name 省略可能。 パイプライン内のシンクを識別する一意の名前。 含まれていない場合は、自動的に UDF の名前が既定として使用されます。
batch_handler これは、マイクロバッチごとに呼び出されるユーザー定義関数 (UDF) です。
df 現在のマイクロバッチのデータを含む Spark DataFrame。
batch_id マイクロバッチの整数 ID。 Spark は、トリガー間隔ごとにこの ID をインクリメントします。
batch_id0は、ストリームの開始または完全な更新の開始を表します。 foreach_batch_sink コードは、ダウンストリーム データ ソースの完全な更新を適切に処理する必要があります。 詳細については、「 完全更新」を参照してください。