Important
Lakeflow Spark 宣言パイプライン (SDP) の環境バージョンは ベータ版です。
environment バージョンが設定されたパイプラインは、Spark Connect を介してPythonコードを実行します。 このページでは、互換性のない内容、動作の違い、影響を受けるパターンのパイプラインをスキャンする方法、および既存のパイプラインを移行する方法について説明します。
制限事項
環境バージョンは、すべてのパイプライン機能とまだ互換性がありません。 パイプラインのPython コードが次のいずれかの処理を行うと、環境バージョンが設定されたパイプラインの実行が失敗します。
- パイプライン デコレーターで修飾された関数内の Spark セッション状態を変更します。 たとえば、
spark.conf.set(...)、spark.sql("USE CATALOG ...")、createOrReplaceTempViewなどがあります。 -
SparkContext、RDD、SQLContext、Py4J API など、Spark Connect で使用できない PySpark API を使用します。 Spark Connect でサポートされている内容を参照してください。
パイプラインで環境バージョンを有効にすると失敗する場合、環境バージョンを無効にすると、パイプラインは以前の状態に戻ります。
動作の変更
Spark Connect には、従来の PySpark ランタイムと少数の動作の違いがあります。 完全なリファレンスについては、 Spark Connect とクラシック Spark を参照してください。 互換性スキャンでは、これらのパターンが事前に検出され、対応されるまで有効化がブロックされるため、運用データに影響を与える前にパターンを見つけて修正できます。
パイプラインでは、動作が異なる可能性がある最も一般的な状況は次のとおりです。
- インターリーブされた DataFrame の構築とセッションの変更
- 変更可能なPython状態を参照する UDF
インターリーブされた DataFrame の構築とセッションの変更
パイプラインが DataFrame を構築し、Spark セッションの状態を変更し (たとえば、既定のカタログまたはスキーマの変更、構成の設定、一時ビューの置き換え、UDF の再登録など)、DataFrame を使用します。
- 環境バージョンがない場合、DataFrame は ミューテーション前 のセッション状態を使用します。
- 環境バージョンでは、DataFrame は 変更後 のセッション状態を使用します。
例えば次が挙げられます。
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
環境バージョンがない場合、 mytable には [(1, "Original Row")]が含まれます。 環境バージョンでは、 mytable には [(2, "Replaced Row")]が含まれます。
変更可能なPython状態を参照する UDF
UDF が定義された後に値が変化するPythonグローバル変数を UDF が参照する場合:
- 環境バージョンがない場合、UDF は変数の 最新 の値を使用します。
- 環境バージョンでは、UDF は UDF が 定義された時点の値を使用します。
例えば次が挙げられます。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
環境バージョンがない場合、 my_mv には [("alex_b",)]が含まれます。 環境バージョンでは、 my_mv には [("alex_a",)]が含まれます。
パイプラインがいずれかのパターンに依存している場合は、環境バージョンを有効にする前に監査します。
互換性スキャン
互換性スキャンは、有効にする前に、環境バージョンで異なる結果を生成するコード パターンをパイプラインで見つけるのに役立ちます。 スキャンはオプトインです。 パイプラインでスキャンが有効になっている場合:
- 各パイプライン実行では、検出されたパターンごとにパイプライン イベント ログに 1 つの
BehaviorChangeInSparkConnectWARNイベントが出力されます。 - 前の正常な更新プログラムのすべての互換性の警告に対処するまで、パイプラインで環境バージョンを有効にすることはできません。
スキャンが有効になっていない場合、イベントは生成されず、 environment_version 有効化はブロックされません。 Databricks では、パイプラインで環境バージョンを有効にする前に、スキャンを有効にし、検出されたパターンを解決することをお勧めします。
パイプラインでスキャンを有効にする
互換性スキャンを有効にするには、 pipelines.environmentVersion.enableCompatibilityScan パイプライン構成を追加します。 パイプライン エディター UI を使用するか、パイプライン構成 JSON にエントリを追加して、構成を追加できます。
UI を使用して次の操作を行います。
- パイプライン エディターで、[ 設定] をクリックします。
- パイプライン設定の [構成] セクションを見つけます。
- [
構成を追加します。
- キーとして
pipelines.environmentVersion.enableCompatibilityScanを入力し、値としてtrueします。 - パイプラインの設定を保存します。
パイプライン JSON で次の手順を実行します。
configuration ブロックに次のエントリを追加します。
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
推奨されるワークフロー
- パイプラインでスキャンを有効にします。
- パイプラインの実行をトリガーする。
-
WARNイベントのクエリを実行します。 問題コード、サンプル パターン、推奨される修正プログラムの完全な一覧については、 互換性イベント リファレンスを参照してください。 - 検出されたパターンを削除するようにパイプライン コードを更新し、それ以上イベントが生成されない限りパイプラインを再実行します。
- パイプラインで
environment_version方法のいずれかを使用して、パイプラインにを追加します。
互換性の警告が誤検知であると思われる場合に、 environment_version を有効にする場合は、パイプライン構成から pipelines.environmentVersion.enableCompatibilityScan エントリを削除してチェックをバイパスします。 (値を false に設定することはできません。エントリを完全に削除する必要があります)。
プレフライト チェックは、以前の更新がないパイプラインや、環境バージョンが既に設定されているパイプラインでは実行されません。
既存のパイプラインを環境バージョンに移行する
環境バージョンをまだ使用していない既存のパイプラインを移行するには、このエンドツーエンドのワークフローに従います。 Spark Connect で動作が異なる可能性があるコード パターンを見つけて修正し、環境バージョンを安全にロールアウトする手順について説明します。
パイプラインで互換性スキャンを有効にします。 互換性スキャンの説明に従って、パイプラインで スキャンを有効にします。 これは、検出されたパターンがイベント ログに表示される原因と、有効化の試行を保護するプレフライト チェックを有効にする原因です。
パイプラインの実行をトリガーし、互換性イベントを確認します。 通常のパイプライン更新をトリガーします。 正常に完了したら、パイプライン イベント ログに対して
BehaviorChangeInSparkConnectWARNイベントのクエリを実行します。 各イベントは、検出された 1 つのパターンを報告します。 問題コード、サンプル パターン、推奨される修正プログラムの完全な一覧については、 互換性イベント リファレンスを参照してください。検出されたパターンに対処するようにパイプライン コードを更新します。 検出されたパターンごとに、推奨される修正に従ってパイプライン コードを更新します。 変更するたびに、別のパイプライン更新をトリガーし、対応するイベントが表示されなくなったかどうかを確認します。 正常に更新されるまで、イベント ログで互換性イベントが表示されなくなるまで繰り返します。
パイプラインで環境バージョンを有効にします。 最新の正常な更新に互換性イベントがない場合は、「パイプラインで
environment_version」の説明に従って、UI、API、またはバンドルを使用してパイプラインにを追加します。 次の更新プログラムは、Spark Connect とピン留めされたPython言語バージョンとプレインストールされたライブラリで実行されます。互換性の警告がまだ存在するため更新に失敗した場合は、
environment_versionを削除し、手順 2 に戻り、残りの警告を解決してから再試行してください。移行を確認します。 環境バージョンの最初の更新が完了したら、次のことを確認します。
- イベント ログの
create_updateイベントには、期待される値に設定environment_versionが表示されます。 - パイプラインによって予期されるデータが生成され、新しいエラー イベントは表示されません。
- 「 動作の変更」で説明されている微妙な動作の違いがないか、ダウンストリーム テーブルをスポットチェックします。
- イベント ログの
ロールバック
移行後にパイプラインの動作が間違っている場合は、パイプライン設定から environment_version を削除します。 次の更新プログラムは、前のPythonランタイム構成で実行されます。 ロールバック実行を使用してデバッグし、問題を特定して修正した後、手順 2. から移行を繰り返します。
互換性イベントリファレンス
パイプラインで互換性スキャンが有効になっている場合、SDP は検出されたパターンごとにBehaviorChangeInSparkConnectに 1 つのWARN イベントを出力します。 スキャンが有効になっていて、以前の正常な更新でパターンが検出されると、SDP はパターンがアドレス指定されるまで environment_version 有効化もブロックします。
各イベントは、検出された内容を識別する 1 つの問題コードを報告します。 コードを検索するには、[ 問題コード ] テーブルでコードを見つけます。各行は、パターンの例と修正候補を含むカテゴリ セクションにリンクします。
イベント図形
BehaviorChangeInSparkConnect イベントは、標準の パイプライン イベント ログ スキーマに従います。
-
event_typeはbehavior_change_in_spark_connectです。 -
levelはWARNです。 -
detailsには、単一のbehavior_change_in_spark_connectフィールドを持つissueオブジェクトが含まれています。 問題の値は、以下に示すコードの 1 つです。 -
messageは、検出されたパターンの人間が判読できる説明です。
問題コード
| カテゴリ | 問題コード | Description |
|---|---|---|
| データベースとカタログの変更 | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
既定のカタログは、DataFrame の作成後に変更されました。 既存の DataFrame では、新しい既定のカタログを使用してテーブルを解決できます。 |
| データベースとカタログの変更 | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG は、パイプライン デコレーターによって修飾された関数の外部で呼び出されました。 既定のカタログは、後続の操作で予期せず変更される可能性があります。 |
| データベースとカタログの変更 | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
既定のデータベースは、DataFrame の作成後に変更されました。 既存の DataFrame は、新しい既定のデータベースを使用してテーブルを解決できます。 |
| データベースとカタログの変更 | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE は、パイプライン デコレーターによって修飾された関数の外部で呼び出されました。 既定のデータベースは、後続の操作で予期せず変更される可能性があります。 |
| フロー関数内での一括実行 | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数はチェックポイント コマンドを呼び出します。 |
| フロー関数内での一括実行 | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、DataFrame ビュー (createOrReplaceTempView など) を熱心に作成します。 |
| フロー関数内での一括実行 | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、リソース プロファイルを作成します。 |
| フロー関数内での一括実行 | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、 spark.resources または関連するリソース API を呼び出します。 |
| フロー関数内での一括実行 | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、ターゲット テーブルに対して一括 MERGE INTO を実行します。 |
| フロー関数内での一括実行 | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、一括 Spark ML 操作を実行します。 |
| フロー関数内での一括実行 | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、Pythonデータ ソースを登録します。 |
| フロー関数内での一括実行 | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、アクティブなストリーミング クエリ ハンドルで動作します。 |
| フロー関数内での一括実行 | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、ストリーミング クエリ リスナーを登録または削除します。 |
| フロー関数内での一括実行 | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、ストリーミング クエリを管理するために spark.streams を呼び出します。 |
| フロー関数内での一括実行 | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、一括 DataFrameWriterV2 操作を実行します。 |
| フロー関数内での一括実行 | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、一括 DataFrame.write 操作を実行します。 |
| フロー関数内での一括実行 | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
フロー関数は、ストリーミング クエリ (writeStream.start()) を開始します。 |
| Spark 構成の変更 | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() または、パイプライン デコレーターによって修飾された関数内で呼び出された spark.conf.unset() 。 これは、環境バージョンではサポートされていません。 |
| Spark 構成の変更 | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() は、DataFrame の作成後にパイプライン デコレーターによって修飾された関数の外部で呼び出されました。 構成の変更は、実行時に既存の DataFrame に影響する可能性があります。 |
| Spark 構成の変更 | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() は、DataFrame の作成後にパイプライン デコレーターによって修飾された関数の外部で呼び出されました。 構成の変更は、実行時に既存の DataFrame に影響する可能性があります。 |
| 一時的なビューの置換 | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
グローバル一時ビューは、それを参照する DataFrame の作成後に置き換えられました。 置換は、既存の DataFrame に反映される場合があります。 |
| 一時的なビューの置換 | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
一時ビューは、それを参照する DataFrame の作成後に置き換えられました。 置換は、既存の DataFrame に反映される場合があります。 |
| UDF と UDTF の変更 | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
UDF は、作成されたデータフレームを参照した後、同じ名前で再登録されました。 既存の DataFrame では、新しい UDF 定義を使用できます。 |
| UDF と UDTF の変更 | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
UDTF は、それを参照する DataFrame が作成された後、同じ名前で再登録されました。 既存の DataFrame では、新しい UDTF 定義を使用できます。 |
| UDF と UDTF の変更 | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
UDF は、グローバル変更可能なPython変数を参照します。 環境バージョンでは、UDF は、呼び出し時ではなく、UDF が定義された時点で変数の値を使用します。 |
| UDF と UDTF の変更 | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
UDTF は、グローバル変更可能なPython変数を参照します。 環境バージョンでは、UDTF は、UDTF が定義された時点で、呼び出し時ではなく変数の値を使用します。 |
データベースとカタログの変更
これらの問題は、パイプライン コードが既定のデータベースまたはカタログを変更するときに生成されます。 環境バージョンでは、変更の前に構築された DataFrame は、新しいデータベースまたはカタログを使用してテーブルを解決できます。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
環境バージョンがない場合、dfはevents カタログからのmarketingを解決します。 環境バージョンでは、dfはevents カタログからのsalesを解決します。
推奨される修正: テーブル名を完全修飾して、解決が既定のカタログまたはデータベースに依存しないようにし、DataFrame の作成と使用の間で既定のカタログまたはデータベースを変更しないようにします。
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Spark 構成の変更
これらの問題は、パイプライン コードが環境バージョンで DataFrame の動作を変更できる方法で Spark 構成を変更するときに生成されます。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
環境バージョンがない場合、キャストは DataFrame 作成時に conf 値を使用します。 環境バージョンでは、キャストは spark.sql.ansi.enabled=true を使用し、無効な入力で失敗する可能性があります。
推奨される修正: DataFrame が作成される前に、必要なすべての Spark 構成をパイプライン ファイルの先頭に設定します。 クエリごとの構成の場合は、パイプライン 仕様でパイプラインの configuration 設定を使用します。
一時的なビューの置換
これらの問題は、パイプライン コードが、それを参照する DataFrame の作成後に一時ビューを置き換えるときに生成されます。 環境バージョンでは、既存の DataFrame に新しいビューの内容が反映される場合があります。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
環境バージョンがない場合、 mytable には [(1, "Original Row")]が含まれます。 環境バージョンでは、 mytable には [(2, "Replaced Row")]が含まれます。
推奨される修正: 各一時ビューを 1 回作成し、置き換えないでください。 関連データを含む複数のビューが必要な場合は、それぞれに個別の名前を付けます。
UDF と UDTF の変更
これらの問題は、パイプライン コードが環境バージョンでの動作を変更する方法で UDF または UDTF を変更するときに生成されます。
イベントをトリガーするパターンの例:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
環境バージョンがない場合、 my_mv には [("alex_b",)]が含まれます。 環境バージョンでは、 my_mv には [("alex_a",)]が含まれます。
Suggested fix: 値をPythonグローバルからキャプチャするのではなく、引数として UDF に渡すか、UDF を定義する前にグローバルを設定し、後で変更しないでください。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
フロー関数内での一括実行
これらの問題は、パイプライン コードがパイプライン デコレーター (@table、 @materialized_view など) によって装飾された関数内で一括 Spark コマンドを実行すると生成されます。 フロー関数は、DataFrame を定義して返す必要があります。データの書き込み、ストリーミング クエリの管理、リソースの登録、または ML 操作の実行を行う一括コマンドは、環境バージョン セットを持つフロー関数内では許可されません。
推奨される修正: 一括操作をフロー関数の外部に移動し、代わりにフロー関数から DataFrame を返します。 テーブルへの書き込み、ストリーミング クエリの開始などの副作用は、パイプライン定義の外部に属します。パイプライン エンジンは、フロー関数によって返される DataFrame の具体化を処理します。
イベント ログで互換性イベントを検索する
次のクエリは、パイプラインのすべての互換性イベントを返します。最新の順序が最初に並べ替えられます。
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
最近の更新で問題コード別にイベントをカウントするには:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
イベント ログのクエリを実行する方法については、イベント ログのクエリを参照してください。
その他のリソース
- パイプラインの環境バージョンを構成する - 機能の概要、環境バージョンを有効にする方法。
- パイプライン イベント ログ スキーマ — パイプライン イベント ログ スキーマ全体。
- パイプライン イベント ログ — パイプライン イベント ログに対してクエリを実行する方法。