DataFrame 操作または SQL テーブル値関数を使用して、構造化ストリーミング状態のデータとメタデータに対してクエリを実行できます。 これらの関数を使用して、構造化ストリーミングステートフル クエリの状態情報を監視します。これは、監視とデバッグに役立ちます。
状態データまたはメタデータのクエリを実行するには、ストリーミングのクエリに対するチェックポイント パスの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 状態情報のクエリには、バッチ読み取りセマンティクスのみを使用できます。
注
Lakeflow Spark 宣言パイプライン、ストリーミング テーブル、または具体化されたビューの状態情報を照会することはできません。 標準アクセス モードで構成されたサーバーレス コンピューティングまたはコンピューティングを使用して状態情報を照会することはできません。
要求事項
- 次のいずれかのコンピューティング構成を使用します。
- 標準アクセスモードで構成されたコンピューティング上の Databricks Runtime バージョン16.3以降。
- 専用アクセス モードまたは分離アクセス モードなしで構成されたコンピューティング上の Databricks Runtime 14.3 LTS 以降。
- ストリーミングクエリで使用されるチェックポイントパスに対する読み取り権限。
構造化ストリーミング状態ストアの読み取り
サポートされている Databricks Runtime で実行される構造化ストリーミングのクエリの状態ストア情報を読み取ることができます。 次の構文を使用します。
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
状態リーダー API のオプションとスキーマ
statestore形式オプションの完全な一覧については、「状態ストア」を参照してください。
出力データには、次のスキーマがあります。
| 列 | タイプ | 説明 |
|---|---|---|
key |
Struct (状態キーから派生した型) | 状態チェックポイント内のステートフル演算子のレコードのキー。 |
value |
Struct (状態値から派生した型) | 状態チェックポイント内のステートフル演算子のレコードの値。 |
partition_id |
整数型 | 状態チェックポイントの一部である、ステートフル演算子のレコードを含むパーティション。 |
Databricks Runtime 16.4 LTS 以降では、 readChangeFeed オプションが true に設定されている場合、出力データには次のスキーマがあります。
| 列 | タイプ | 説明 |
|---|---|---|
batch_id |
長い | 状態変更が属するバッチ ID。 |
change_type |
糸 | バッチによって適用される変更の種類: 挿入と更新の update 、削除 delete 。 |
key |
Struct (状態キーから派生した型) | 状態チェックポイント内のステートフル演算子のレコードのキー。 |
value |
Struct (状態値から派生した型) | 状態チェックポイント内のステートフル演算子のレコードの値。
null
change_type が delete であるレコードの場合。 |
partition_id |
整数型 | 状態チェックポイントの一部である、ステートフル演算子のレコードを含むパーティション。 |
テーブル値関数read_statestore参照してください。
構造化ストリーミング状態の変更の読み取り
Databricks Runtime 16.4 LTS 以降で使用できます。 1 つのマイクロバッチで完全な状態を表示するのではなく、マイクロバッチ全体で状態がどのように変化するかを確認するには、 readChangeFeed を true に設定し、 changeStartBatchIdを指定します。 必要に応じて、 changeEndBatchIdを指定します。 オプションの完全な一覧については、「 状態ストア」を参照してください。
たとえば、バッチ 2 から最新のコミット済みバッチまでの状態変更を読み取る場合は、次のようにします。
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
出力スキーマには、追加の batch_id 列と change_type 列が含まれています。 完全なスキーマについては、「 状態リーダー API のオプションとスキーマ」を参照してください。
構造化ストリーミング状態のメタデータの読み取り
Databricks Runtime 14.3 LTS 以降で使用できます。 構造化ストリーミング クエリの状態メタデータ情報を読み取ることができます。
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
返されるデータには次のスキーマがあります。
| 列 | タイプ | 説明 |
|---|---|---|
operatorId |
整数型 | ステートフル ストリーミング演算子の整数 ID。 |
operatorName |
糸 | ステートフル ストリーミング演算子の名前。 |
stateStoreName |
糸 | 演算子の状態ストアの名前。 |
numPartitions |
整数型 | 状態ストアのパーティションの数。 |
minBatchId |
長い | 状態のクエリに使用できる最小バッチ ID。 |
maxBatchId |
長い | 状態のクエリに使用できる最大バッチ ID。 |
注
minBatchId と maxBatchId によって指定されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。 古いバッチはマイクロ バッチの実行によって自動的にクリーンアップされるため、ここで指定された値が引き続き使用できるとは限りません。
テーブル値関数read_state_metadata参照してください。
例: ストリーム結合の片方をクエリする
ストリーム ストリーム結合の左側でクエリを実行するには、次の構文を使用します。
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
例: 複数のステートフル演算子を使用してストリームの状態ストアを照会する
この例では、状態メタデータ リーダーを使用して、複数のステートフル演算子を使用してストリーミング クエリのメタデータの詳細を収集し、そのメタデータ結果を状態リーダーのオプションとして使用します。
状態メタデータ リーダーは、次の構文例のように、唯一のオプションとしてチェックポイント パスを受け取ります。
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
次の表は、状態ストア メタデータの出力例を示しています。
| operatorId | オペレーター名 | 状態ストア名 | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | ステートストア保存 | デフォルト | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark(ウォーターマーク内で重複排除) | デフォルト | 200 | 0 | 13 |
dedupeWithinWatermark演算子の結果を取得するには、次の例のように、operatorId オプションを使用して状態リーダーにクエリを実行します。
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);