構造化ストリーミング状態情報の読み取り

DataFrame 操作または SQL テーブル値関数を使用して、構造化ストリーミング状態のデータとメタデータに対してクエリを実行できます。 これらの関数を使用して、構造化ストリーミングステートフル クエリの状態情報を監視します。これは、監視とデバッグに役立ちます。

状態データまたはメタデータのクエリを実行するには、ストリーミングのクエリに対するチェックポイント パスの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 状態情報のクエリには、バッチ読み取りセマンティクスのみを使用できます。

Lakeflow Spark 宣言パイプライン、ストリーミング テーブル、または具体化されたビューの状態情報を照会することはできません。 標準アクセス モードで構成されたサーバーレス コンピューティングまたはコンピューティングを使用して状態情報を照会することはできません。

要求事項

  • 次のいずれかのコンピューティング構成を使用します。
    • 標準アクセスモードで構成されたコンピューティング上の Databricks Runtime バージョン16.3以降。
    • 専用アクセス モードまたは分離アクセス モードなしで構成されたコンピューティング上の Databricks Runtime 14.3 LTS 以降。
  • ストリーミングクエリで使用されるチェックポイントパスに対する読み取り権限。

構造化ストリーミング状態ストアの読み取り

サポートされている Databricks Runtime で実行される構造化ストリーミングのクエリの状態ストア情報を読み取ることができます。 次の構文を使用します。

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

状態リーダー API のオプションとスキーマ

statestore形式オプションの完全な一覧については、「状態ストア」を参照してください。

出力データには、次のスキーマがあります。

タイプ 説明
key Struct (状態キーから派生した型) 状態チェックポイント内のステートフル演算子のレコードのキー。
value Struct (状態値から派生した型) 状態チェックポイント内のステートフル演算子のレコードの値。
partition_id 整数型 状態チェックポイントの一部である、ステートフル演算子のレコードを含むパーティション。

Databricks Runtime 16.4 LTS 以降では、 readChangeFeed オプションが true に設定されている場合、出力データには次のスキーマがあります。

タイプ 説明
batch_id 長い 状態変更が属するバッチ ID。
change_type バッチによって適用される変更の種類: 挿入と更新の update 、削除 delete
key Struct (状態キーから派生した型) 状態チェックポイント内のステートフル演算子のレコードのキー。
value Struct (状態値から派生した型) 状態チェックポイント内のステートフル演算子のレコードの値。 null change_typedelete であるレコードの場合。
partition_id 整数型 状態チェックポイントの一部である、ステートフル演算子のレコードを含むパーティション。

テーブル値関数read_statestore参照してください。

構造化ストリーミング状態の変更の読み取り

Databricks Runtime 16.4 LTS 以降で使用できます。 1 つのマイクロバッチで完全な状態を表示するのではなく、マイクロバッチ全体で状態がどのように変化するかを確認するには、 readChangeFeedtrue に設定し、 changeStartBatchIdを指定します。 必要に応じて、 changeEndBatchIdを指定します。 オプションの完全な一覧については、「 状態ストア」を参照してください。

たとえば、バッチ 2 から最新のコミット済みバッチまでの状態変更を読み取る場合は、次のようにします。

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

出力スキーマには、追加の batch_id 列と change_type 列が含まれています。 完全なスキーマについては、「 状態リーダー API のオプションとスキーマ」を参照してください。

構造化ストリーミング状態のメタデータの読み取り

Databricks Runtime 14.3 LTS 以降で使用できます。 構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

返されるデータには次のスキーマがあります。

タイプ 説明
operatorId 整数型 ステートフル ストリーミング演算子の整数 ID。
operatorName ステートフル ストリーミング演算子の名前。
stateStoreName 演算子の状態ストアの名前。
numPartitions 整数型 状態ストアのパーティションの数。
minBatchId 長い 状態のクエリに使用できる最小バッチ ID。
maxBatchId 長い 状態のクエリに使用できる最大バッチ ID。

minBatchIdmaxBatchId によって指定されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。 古いバッチはマイクロ バッチの実行によって自動的にクリーンアップされるため、ここで指定された値が引き続き使用できるとは限りません。

テーブル値関数read_state_metadata参照してください。

例: ストリーム結合の片方をクエリする

ストリーム ストリーム結合の左側でクエリを実行するには、次の構文を使用します。

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

例: 複数のステートフル演算子を使用してストリームの状態ストアを照会する

この例では、状態メタデータ リーダーを使用して、複数のステートフル演算子を使用してストリーミング クエリのメタデータの詳細を収集し、そのメタデータ結果を状態リーダーのオプションとして使用します。

状態メタデータ リーダーは、次の構文例のように、唯一のオプションとしてチェックポイント パスを受け取ります。

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

次の表は、状態ストア メタデータの出力例を示しています。

operatorId オペレーター名 状態ストア名 numPartitions minBatchId maxBatchId
0 ステートストア保存 デフォルト 200 0 13
1 dedupeWithinWatermark(ウォーターマーク内で重複排除) デフォルト 200 0 13

dedupeWithinWatermark演算子の結果を取得するには、次の例のように、operatorId オプションを使用して状態リーダーにクエリを実行します。

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);