Lakeflow Spark 宣言パイプライン (SDP) は、 AUTO CDC API と AUTO CDC FROM SNAPSHOT API を使用して変更データ キャプチャ (CDC) を簡略化します。 これらの API は、CDC フィードまたはデータベース スナップショットから、緩やかに変化するディメンション (SCD) タイプ 1 とタイプ 2 の計算の複雑さを自動化します。
AUTO CDC API は、2 つの時間軸にまたがる変更を記録するバイテンポラル追跡 (ベータ) もサポートしています。 SCD タイプ 1 とタイプ 2 の詳細については、「 データ キャプチャとスナップショットの変更」を参照してください。 バイテンポラル追跡の詳細については、「Bitemporal AUTO CDC の仕組み」を参照してください。
注
AUTO CDC API は、APPLY CHANGES API を置き換え、同じ構文を持ちます。
APPLY CHANGES API は引き続き使用できますが、Databricks では、AUTO CDC API を代わりに使用することをお勧めします。
使用する API は、変更データのソースによって異なります。
-
AUTO CDC: ソース データベースで CDC フィードが有効になっている場合に使用します。AUTO CDCは、変更データ フィード (CDF) からの変更を処理します。 これは、パイプライン SQL インターフェイスと Python インターフェイスの両方でサポートされています。 -
AUTO CDC FROM SNAPSHOT: ソース データベースで CDC が有効ではなく、スナップショットのみを使用できる場合に使用します。 この API は、スナップショットを比較して変更を決定し、それらを処理します。 Python インターフェイスでのみサポートされます。
どちらの API でも、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされています。
- SCD タイプ 1 を使用して、レコードを直接更新します。 更新されたレコードの履歴は保持されません。
- SCD タイプ 2 を使用して、すべての更新または指定された列セットの更新時にレコードの履歴を保持します。
AUTO CDC のみで、SCD Type 2 の履歴を拡張し、ビジネス時間とシステム時間という 2 つの時間軸にまたがる変更を追跡できるバイテンポラル ストレージを使用することもできます。 Bitemporal は ベータ版です。
「Bitemporal AUTO CDC のしくみ」を参照してください。
AUTO CDC API は、Apache Spark 宣言パイプラインではサポートされていません。
構文とその他の参照については、 AUTO CDC INTO (パイプライン)、 create_auto_cdc_flow、 およびcreate_auto_cdc_from_snapshot_flowに関するページを参照してください。
注
このページでは、ソース データの変更に基づいてパイプライン内のテーブルを更新する方法について説明します。 デルタ テーブルの行レベルの変更情報を記録およびクエリする方法については、「Azure Databricksでの変更データ フィードの使用」を参照してください。
Requirements
CDC API を使用するには、 サーバーレス SDP または SDPPro または Advanceditions を使用するようにパイプラインを構成する必要があります。
AUTO CDC のしくみ
AUTO CDCを使用して CDC 処理を実行するには、ストリーミング テーブルを作成し、SQL の AUTO CDC ... INTO ステートメントまたは Python の create_auto_cdc_flow() 関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 シーケンス処理と SCD ロジックのしくみの詳細については、 データ キャプチャとスナップショットの変更に関する記事を参照してください。
AUTO CDC の例を参照してください。
変更フィードを含むソースからの初期ハイドレートの場合は、AUTO CDC フローでonceを使用し、変更フィードの処理を続行します。
AUTO CDC を使用した外部 RDBMS テーブルのレプリケートに関するページを参照してください。
構文の詳細については、 AUTO CDC INTO (パイプライン) またはcreate_auto_cdc_flowに関するページを参照 してください。
スナップショットからの自動 CDC のしくみ
AUTO CDC FROM SNAPSHOT は、順番にスナップショットを比較することで、ソース データの変更を決定します。 Python パイプライン インターフェイスでのみサポートされます。 差分テーブル、クラウド ストレージ ファイル、または JDBC からスナップショットを直接読み取ることができます。
AUTO CDC FROM SNAPSHOTを使用して CDC 処理を実行するには、ストリーミング テーブルを作成し、create_auto_cdc_from_snapshot_flow()関数を使用してスナップショット、キー、およびその他の引数を指定します。 2 つのインジェスト パターンの詳細と、それぞれを使用するタイミングについては、「 スナップショット処理パターン」を参照してください。
AUTO CDC FROM SNAPSHOT の例を参照してください。
構文の詳細については、 create_auto_cdc_from_snapshot_flowを参照してください。
Bitemporal AUTO CDC の仕組み
Important
Bitemporal AUTO CDC は ベータ版です。
SCD タイプ1とタイプ2は単一時制であり、単一の時間軸に沿った変更を追跡します。 Bitemporal は SCD Type 2 の履歴を拡張して、2 つの時間ディメンション間の変更を追跡し、2 つのパースペクティブを区別します。
- 営業時間: イベントが実際に発生したとき。
- システム時刻: システムがイベントを記録または取り込んだ時刻。
SCD タイプ 2 と同様に、bitemporal はレコードの完全な履歴を保持します。 2 つ目のタイムラインが追加されるため、データの表示内容と、過去の任意の時点でシステムが信じていた内容の両方を再構築できます。
たとえば、ヘッジファンドは、ソースシステムから株式データを取り込みます。 アクメ社の株価は1月1日に変更されるが、ファンドはその更新を1月5日まで取り込まない。 Bitemporal AUTO CDCを使用すると、Acme Corpの実際の株価は1月1日(営業時間)と、ファンドが1月3日(システム時間)に取引決定を行ったときにシステムが信じた価格という2つの異なる質問に答えます。 これらのタイムラインを区別する機能は、監査、規制レポート、財務上の意思決定に役立ちます。
二重時制処理を有効にするには、STORED AS BITEMPORAL(SQL)または stored_as_scd_type="bitemporal"(Python)を設定し、ビジネス時間列には SEQUENCE BY を使用し、システム時間列には SYSTEM SEQUENCE BY を使用します。 ターゲット テーブルでは、SCD Type 2 の__SYSTEM_START_AT列と__SYSTEM_END_AT列の横に、__START_AT列と__END_AT列が追加されます。 構文の詳細については、 AUTO CDC INTO (パイプライン) またはcreate_auto_cdc_flowに関するページを参照 してください。
挿入、更新、順不同の更新、および削除が二重時制テーブルにどのような影響を与えるかを手順を追って説明した解説については、Bitemporal AUTO CDC examplesを参照してください。
シーケンス処理に複数の列を使用する
複数の列 (タイムスタンプと ID を使用してタイを解除する場合など) を順番に並べるには、 STRUCT を使用してそれらを組み合わせます。 API は最初のフィールドを優先して並べ替えを行い、値が同じ場合は 2 番目のフィールドを基準にし、その後も同様に順次判断します。
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
AUTO CDC の例
次の例は、変更データ フィード ソースを使用した SCD タイプ 1 およびタイプ 2 の処理を示しています。 サンプル データは、新しいユーザー レコードを作成し、ユーザー レコードを削除して、ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作は到着が遅れ、ターゲット テーブルから削除され、順不同のイベント処理が示されています。
これらの例で使用される入力レコードを次に示します。 このデータは、「サンプル データの作成」セクションでクエリを実行することによって 作成 されます。
| userId | 名前 | city | 操作 | シーケンス番号 |
|---|---|---|---|---|
| 124 | ラウル | Oaxaca | INSERT | 1 |
| 123 | Isabel | モンテレー | INSERT | 1 |
| 125 | メルセデス | ティファナ | INSERT | 2 |
| 126 | リリー | カンクン | INSERT | 2 |
| 123 | null 値 | null 値 | DELETE | 6 |
| 125 | メルセデス | Guadalajara | UPDATE | 6 |
| 125 | メルセデス | メヒカリ | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
サンプル データ生成クエリの最後の行のコメントを解除すると、テーブルを切り捨てる (テーブルをクリアする) ことを指定する次のレコードが sequenceNum=3に挿入されます。
| userId | 名前 | city | 操作 | シーケンス番号 |
|---|---|---|---|---|
| null 値 | null 値 | null 値 | 切り捨てる | 3 |
注
次の例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれは省略可能です。
サンプル データを作成する
次のステートメントを実行して、サンプル データセットを作成します。 このコードは、パイプライン定義の一部として実行されるものではありません。 変換フォルダーではなく、パイプラインの探索フォルダーから実行します。
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
SCD タイプ 1 の更新を処理する
SCD タイプ 1 では、各レコードの最新バージョンのみが保持されます。 次の例では、上記で作成した変更データ フィードから読み取り、変更をストリーミング テーブル ターゲットに適用します。 このコードを実行するには、パイプラインとは何ですか?。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD タイプ 1 の例を実行すると、ターゲット テーブルには次のレコードが含まれます。
| userId | 名前 | city |
|---|---|---|
| 124 | ラウル | Oaxaca |
| 125 | メルセデス | Guadalajara |
| 126 | リリー | カンクン |
ユーザー 123 (Isabel) が削除され、表示されません。 SCD Type 1 は以前の値を上書きするため、ユーザー 125 (メルセデス) には最新の市区町村 (グアダラハラ) のみが表示されます。
sequenceNum=5 の以前の UPDATE は、後に sequenceNum=6 で更新プログラムが利用可能になるため、削除されました。
TRUNCATE レコードをコメント解除してこの例を実行すると、テーブルは sequenceNum=3 でクリアされます。 つまり、レコードの 124 と 126 はテーブルに含まれていないので、最終的なターゲット テーブルには次のレコードのみが含まれます。
| userId | 名前 | city |
|---|---|---|
| 125 | メルセデス | Guadalajara |
SCD タイプ 2 の更新を処理する
SCD Type 2 では、レコードの各バージョンに対して新しい行を作成し、各バージョンがアクティブだったタイミングを示す __START_AT 列と __END_AT 列を使用して、変更の完全な履歴を保持します。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD Type 2 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。
| userId | 名前 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | モンテレー | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | ラウル | Oaxaca | 1 | null 値 |
| 125 | メルセデス | ティファナ | 2 | 5 |
| 125 | メルセデス | メヒカリ | 5 | 6 |
| 125 | メルセデス | Guadalajara | 6 | null 値 |
| 126 | リリー | カンクン | 2 | null 値 |
テーブルは完全な履歴を保持します。 ユーザー 123 には 2 つのバージョンがあります (削除されるとシーケンス 6 で終了)。 ユーザー 125 には、市区町村の変更を示す 3 つのバージョンがあります。
__END_AT = nullを含むレコードは現在アクティブです。
SCD タイプ 2 で列のサブセットを追跡する
既定では、SCD Type 2 では、列の値が変更されるたびに新しいバージョンが作成されます。 追跡する列のサブセットを指定すると、新しい履歴レコードを生成するのではなく、他の列に対する変更によって現在のバージョンが更新されます。
次の例では、履歴追跡から city 列を除外します。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
cityの変更は追跡されないため、新しいバージョンを作成する代わりに、市区町村の更新によって現在の行が上書きされます。 ターゲット テーブルには、次のレコードが含まれています。
| userId | 名前 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | ラウル | Oaxaca | 1 | null 値 |
| 125 | メルセデス | Guadalajara | 2 | null 値 |
| 126 | リリー | カンクン | 2 | null 値 |
スナップショットによるAUTO CDCの例
次のセクションでは、 AUTO CDC FROM SNAPSHOT を使用してスナップショットを SCD Type 1 または Type 2 ターゲット テーブルに処理する例を示します。 この API を使用する場合の背景については、「 データ キャプチャとスナップショットの変更」を参照してください。
例: パイプライン インジェスト時間を使用してスナップショットを処理する
この方法は、スナップショットが定期的かつ順番に到着し、バージョン管理のためにパイプライン実行タイムスタンプに依存できる場合に使用します。 パイプラインの更新ごとに新しいスナップショットが取り込まれます。
差分テーブル、クラウド ストレージ ファイル、JDBC 接続など、複数のソースの種類からスナップショットを読み取ることができます。
手順 1: サンプル データを作成する
スナップショット データを含むテーブルを作成します。 パイプラインの explorations フォルダーにあるノートブックまたは Databricks SQL から次のコードを実行します。
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
手順 2: スナップショットから AUTO CDC を実行する
パイプラインとは この手順でコードを実行します。
スナップショット ビューのソースの種類を選択します (サンプル作成コードでは Delta テーブルが生成されます)。
オプション A: Delta テーブルから読み取る
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
オプション B: クラウド ストレージから読み取る
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
オプション C: JDBC から読み取る (クラシック コンピューティングのみ)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
すべてのオプション、ターゲットに書き込む
次に、ターゲット テーブルとフローを追加します。
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
最初のパイプラインの実行後、すべてのレコードがアクティブな行として挿入されます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | null 値 |
| 2 | モンテレー | 0 | null 値 |
| 3 | ティファナ | 0 | null 値 |
注
代わりに SCD Type 1 を使用し、現在の状態のみを保持するには、 stored_as_scd_type=1設定します。 この場合、ターゲット テーブルには __START_AT 列と __END_AT 列は含まれません。
手順 3: 新しいスナップショットをシミュレートして再実行する
ソース テーブルを更新して、到着する新しいスナップショットをシミュレートします (pipline の explorations フォルダーにあるノートブックまたは SQL ファイルからこのコードを実行します)。
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
パイプラインを再実行します。
AUTO CDC FROM SNAPSHOT は、新しいスナップショットを前のスナップショットと比較し、ユーザー 1 が削除され、ユーザー 2 と 3 が更新され、ユーザー 4 と 6 が挿入されたことを検出します。 これにより変更フィードが生成され、 AUTO CDC を使用して出力テーブルが作成されます。
SCD タイプ 2 を使用した 2 回目の実行後、ターゲット テーブルには次のレコードが含まれます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 0 | 1 |
| 2 | モンテレー | 0 | 1 |
| 2 | カーメル | 1 | null 値 |
| 3 | ティファナ | 0 | 1 |
| 3 | ロサンゼルス | 1 | null 値 |
| 4 | デスバレー | 1 | null 値 |
| 6 | キングスキャニオン | 1 | null 値 |
ユーザー 1 は終了 (削除) されました。 ユーザー 2 と 3 にはそれぞれ、都市の変更を示す 2 つのバージョンがあります。 ユーザー 4 と 6 が新しく挿入されました。
SCD タイプ 1 で 2 回目の実行が行われた後、ターゲット テーブルには現在の状態のみが表示されます。
| userId | city |
|---|---|
| 2 | カーメル |
| 3 | ロサンゼルス |
| 4 | デスバレー |
| 6 | キングスキャニオン |
例: バージョン関数を使用してスナップショットを処理する
スナップショットの順序を明示的に制御する必要がある場合は、この方法を使用します。 たとえば、複数のスナップショットが同時に到着する場合や、スナップショットが順不同で到着する場合に、この方法を使用します。 次に処理するスナップショットとそのバージョン番号を指定する関数を記述します。 API は、バージョンの昇順でスナップショットを処理します。
- 複数のスナップショットがストレージ内にある場合、それらはすべて順番に処理されます。
- スナップショットが順序に誤って到着した場合 (たとえば、
snapshot_3後に到着snapshot_4)、スナップショットはスキップされます。 - 新しいスナップショットがない場合、関数は
Noneを返し、処理は行われません。
手順 1: スナップショット ファイルを準備する
スナップショット データを含む CSV ファイルを作成し、ボリュームまたはクラウドストレージの場所に追加します。 ファイルに時系列で名前を付けます (たとえば、 snapshot_1.csv、 snapshot_2.csv)。
各ファイルには、 userId と cityの列が含まれている必要があります。 例えば次が挙げられます。
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | Oaxaca |
| 2 | モンテレー |
| 3 | ティファナ |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | カーメル |
| 3 | ロサンゼルス |
| 4 | デスバレー |
手順 2: バージョン関数を使用して AUTO CDC FROM SNAPSHOT を実行する
新しいノートブックを作成し、次のパイプライン コードを貼り付けます。 次に、パイプラインとは何ですか?
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
注
代わりに SCD Type 1 を使用するには、 stored_as_scd_type=1設定します。
snapshot_1.csv処理後、ターゲット テーブルには次のレコードが含まれます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | null 値 |
| 2 | モンテレー | 1 | null 値 |
| 3 | ティファナ | 1 | null 値 |
snapshot_2.csv処理後、ターゲット テーブルには次のレコードが含まれます。
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | Oaxaca | 1 | 2 |
| 2 | モンテレー | 1 | 2 |
| 2 | カーメル | 2 | null 値 |
| 3 | ティファナ | 1 | 2 |
| 3 | ロサンゼルス | 2 | null 値 |
| 4 | デスバレー | 2 | null 値 |
注
SCD タイプ 1 の場合、テーブルは最新のスナップショットとまったく同じになります。 違いは、ダウンストリーム クエリで変更フィードを使用して、変更されたレコードのみを処理できることです。
手順 3: 新しいスナップショットを追加する
変更されたデータ (変更された市区町村の値、新しい行、削除された行など) を含む新しい CSV ファイルを保存場所に追加します。 次に、パイプラインをもう一度実行して、新しいスナップショットを処理します。
Bitemporal AUTO CDC の例
Important
Bitemporal AUTO CDC は ベータ版です。
次の例では、少数の合成 CDC イベントを使って、二重時間ターゲット テーブルを作成します。
bt列には業務時間があり、st列にはシステム時刻があります。
Python
from pyspark import pipelines as dp
# Source: synthetic CDC events
dp.create_streaming_table(name="cdc_source")
@dp.append_flow(target="cdc_source", once=True)
def load_cdc_source():
return spark.createDataFrame(
[
(1, "x10", "y10", 10, 100),
(1, "x20", "y20", 20, 200)
],
schema="id INT, x STRING, y STRING, bt INT, st INT",
)
# Target: bitemporal table
dp.create_streaming_table(name="target_bitemporal")
dp.create_auto_cdc_flow(
target = "target_bitemporal",
source = "cdc_source",
keys = ["id"],
sequence_by = "bt",
system_sequence_by = "st",
stored_as_scd_type = "bitemporal"
)
SQL
-- Source: synthetic CDC events
CREATE OR REFRESH STREAMING TABLE cdc_source_sql;
CREATE FLOW cdc_source_sql AS INSERT INTO ONCE
cdc_source_sql BY NAME
SELECT * FROM VALUES
(1, 'x10', 'y10', 10, 100),
(1, 'x20', 'y20', 20, 200)
AS t(id, x, y, bt, st);
-- Target: bitemporal table
CREATE OR REFRESH STREAMING TABLE target_bitemporal_sql;
CREATE FLOW target_bitemporal_sql AS AUTO CDC INTO
target_bitemporal_sql
FROM
stream(cdc_source_sql)
KEYS
(id)
SEQUENCE BY
bt
SYSTEM SEQUENCE BY
st
STORED AS
BITEMPORAL;
次の手順では、二重時間テーブルが 1 社の企業について、挿入、更新、順不同の更新、および削除をどのように記録するかを説明します。 シーケンス列は __START_AT 列と __END_AT 列 (業務時間) 列を生成し、システムシーケンス列は __SYSTEM_START_AT 列と __SYSTEM_END_AT (システム時刻) 列を生成します。
| Column | Description |
|---|---|
__START_AT |
この行が有効になった業務時刻。 |
__END_AT |
この行の有効性が終了する業務時刻。
null 有効な場合は無期限です。 |
__SYSTEM_START_AT |
この行のデータとビジネス時間区間が有効であることが確認されているシステム時刻。 |
__SYSTEM_END_AT |
この行のデータと業務時間間隔が無効であることがわかっているシステム時刻。
null が無期限に true であることがわかっている場合は〘。 |
システムは、両方のタイムラインで任意の順序で到着するイベントを処理します。 既に処理されているイベントよりも早い業務時間またはシステム時間でイベントが到着すると、システムは、末尾にのみ追加するのではなく、影響を受ける履歴を修正します。
手順 1: 挿入
A 社は 2025 年 7 月 18 日 10:01:00 (営業時間) に追加されますが、10:05:00 (システム時刻) まで取り込まれません。
入力:
| CompanyId | データポイント | 優先順位 | システムのシーケンス | Operation |
|---|---|---|---|---|
| A | XFv1 | 7/18/2025 10:01:00 | 7/18/2025 10:05:00 | INSERT |
Output:
| CompanyId | データポイント | __START_AT | __END_AT | __SYSTEM_START_AT | __SYSTEM_END_AT |
|---|---|---|---|---|---|
| A | XFv1 | 7/18/2025 10:01:00 | NULL | 7/18/2025 10:05:00 | NULL |
XFv1 は 10:01:00 以降で有効であり、既知の終了はありません。 システムはこの事実をシステム時刻 10:05:00 に学習し、終了は不明です。
手順 2: 更新
A 社は 2025 年 7 月 18 日 12:15:43 (営業時間) に更新され、システムは 12:20:00 (システム時間) にイベントを使用します。 システムは、更新プログラムが認識される前に信じていたものと、更新プログラムが取り込まれた後の修正されたビジネス履歴の両方を保持します。
入力:
| CompanyId | データポイント | 優先順位 | システムのシーケンス | Operation |
|---|---|---|---|---|
| A | XFv2 | 7/18/2025 12:15:43 | 7/18/2025 12:20:00 | UPDATE |
Output:
| CompanyId | データポイント | __START_AT | __END_AT | __SYSTEM_START_AT | __SYSTEM_END_AT |
|---|---|---|---|---|---|
| A | XFv1 | 7/18/2025 10:01:00 | NULL | 7/18/2025 10:05:00 | 7/18/2025 12:20:00 |
| A | XFv1 | 7/18/2025 10:01:00 | 7/18/2025 12:15:43 | 7/18/2025 12:20:00 | NULL |
| A | XFv2 | 7/18/2025 12:15:43 | NULL | 7/18/2025 12:20:00 | NULL |
XFv1 は 10:01:00 から有効であり、終了時刻は不明であると考えられており、システムは 10:05:00 から 12:20:00 までそのように認識していました。 XFv1 は、12:15:43 までのみ有効であることが判明している、システム時刻 12:20:00 から有効となる既知の終了時刻がない修正済み履歴です。 XFv2 は 12:15:43 から有効であり、既知の終了がなく、システム時刻 12:20:00 に学習されました。
手順 3: 順不同の更新
会社 A が実際には 2025 年 7 月 18 日 12:05:00(業務時刻)に更新されたことを示す順序外の更新が到着しますが、12:25:00(システム時刻)まで取り込まれません。 更新がシステム時間上では後から利用可能であっても、営業時刻ではそれ以前の時点に属する場合、システムは過去のビジネス時刻の履歴を修正するとともに、順序どおりでない更新を受け取る前の履歴と、修正後の履歴の両方を保持します。
入力:
| CompanyId | データポイント | 優先順位 | システムのシーケンス | Operation |
|---|---|---|---|---|
| A | XFv3 | 7/18/2025 12:05:00 | 7/18/2025 12:25:00 | UPDATE |
Output:
| CompanyId | データポイント | __START_AT | __END_AT | __SYSTEM_START_AT | __SYSTEM_END_AT |
|---|---|---|---|---|---|
| A | XFv1 | 7/18/2025 10:01:00 | NULL | 7/18/2025 10:05:00 | 7/18/2025 12:20:00 |
| A | XFv1 | 7/18/2025 10:01:00 | 7/18/2025 12:15:43 | 7/18/2025 12:20:00 | 7/18/2025 12:25:00 |
| A | XFv1 | 7/18/2025 10:01:00 | 7/18/2025 12:05:00 | 7/18/2025 12:25:00 | NULL |
| A | XFv3 | 7/18/2025 12:05:00 | 7/18/2025 12:15:43 | 7/18/2025 12:25:00 | NULL |
| A | XFv2 | 7/18/2025 12:15:43 | NULL | 7/18/2025 12:20:00 | NULL |
XFv1 は 10:01:00 から 12:15:43 まで有効であると考えられ、その信念はシステム時刻の 12:25:00 まで有効になりました。 この新しい更新により、XFv1 のビジネス上の有効期間の終了時刻が 12:05:00 に修正され、これはシステム時刻 12:25:00 から有効となる修正済み履歴です。 XFv3 は、12:05:00 から 12:15:43 まで有効であることが現在判明しており、この認識はシステム時刻では 12:25:00 から有効で、終了時刻は不明です。
手順 4: 削除
A 社は 2025 年 7 月 18 日 12:30:00 に削除され、システムは 12:30:00 にイベントを使用します。 削除操作はエンティティのビジネスの存在の終わりを表すので、システムは置換行を作成しません。 XFv2 は 2 つの行に表示され、会社が存在しなくなったときとシステムが削除を知ったときの両方の完全な監査証跡が保持されます。
入力:
| 会社ID | データポイント | 優先順位 | システムのシーケンス | Operation |
|---|---|---|---|---|
| A | XFv2 | 7/18/2025 12:30:00 | 7/18/2025 12:30:00 | DELETE |
Output:
| 会社 ID | データポイント | __START_AT | __END_AT | __SYSTEM_START_AT | __SYSTEM_END_AT |
|---|---|---|---|---|---|
| A | XFv1 | 7/18/2025 10:01:00 | NULL | 7/18/2025 10:05:00 | 7/18/2025 12:20:00 |
| A | XFv1 | 7/18/2025 10:01:00 | 7/18/2025 12:15:43 | 7/18/2025 12:20:00 | 7/18/2025 12:25:00 |
| A | XFv1 | 7/18/2025 10:01:00 | 7/18/2025 12:05:00 | 7/18/2025 12:25:00 | NULL |
| A | XFv3 | 7/18/2025 12:05:00 | 7/18/2025 12:15:43 | 7/18/2025 12:25:00 | NULL |
| A | XFv2 | 7/18/2025 12:15:43 | NULL | 7/18/2025 12:20:00 | 7/18/2025 12:30:00 |
| A | XFv2 | 7/18/2025 12:15:43 | 7/18/2025 12:30:00 | 7/18/2025 12:30:00 | NULL |
XFv2 は 12:15:43 から有効で、既知の終了時刻はなく、システムは 12:20:00 から 12:30:00 までそのように認識していました。 削除が取り込まれた後に、XFv2 が有効であることが確認されている期間は 12:30:00 までとなり、システム時刻 12:30:00 から有効な修正済みの履歴が適用されます。
制限事項
- シーケンス列は、並べ替え可能なデータ型である必要があります。
NULLシーケンス値はサポートされていません。 -
AUTO CDC FROM SNAPSHOTは Python パイプライン インターフェイスでのみサポートされます。SQL インターフェイスはサポートされていません。 - AUTO CDC プロセスのターゲットからデータをストリーミングするには、その変更フィードから読み取ります。 詳細については、「 AUTO CDC ターゲット テーブルから変更データ フィードを読み取る」を参照してください。
その他のリソース
- 変更データキャプチャとスナップショット: CDC の概念、スナップショット、および SCD タイプについて学びます。
-
AUTO CDCを使用して外部 RDBMS テーブルをレプリケートする:onceフローで初期ハイドレートを実行し、変更の処理を続行する方法について説明します。 - AUTO CDC の高度なトピック: AUTO CDC ターゲットに対する変更操作、変更データ フィードの読み取り、メトリックの処理について説明します。
- チュートリアル: 変更データ キャプチャを使用して ETL パイプラインを構築する