Compatibilità delle versioni dell'ambiente

Importante

Le versioni dell'ambiente per SDP sono in versione beta.

Le pipeline con una versione environment impostare l'esecuzione del codice Python tramite Spark Connect. Questa pagina illustra cosa non è compatibile, cosa si comporta in modo diverso, come analizzare una pipeline per individuare i modelli interessati e come eseguire la migrazione di una pipeline esistente.

Limitations

Le versioni dell'ambiente non sono ancora compatibili con tutte le funzionalità della pipeline. Un'esecuzione della pipeline con un set di versioni dell'ambiente ha esito negativo se il codice Python della pipeline esegue una delle operazioni seguenti:

  • Modifica lo stato della sessione Spark all'interno di una funzione decorata con un decorator di pipeline. Gli esempi includono spark.conf.set(...), spark.sql("USE CATALOG ...")e createOrReplaceTempView.
  • Usa le API PySpark non disponibili in Spark Connect, tra cui SparkContext, RDDSQLContext, e qualsiasi API Py4J. Vedere Informazioni supportate in Spark Connect.

Se l'abilitazione di una versione dell'ambiente in una pipeline causa un errore, la disabilitazione della versione dell'ambiente restituisce la pipeline allo stato precedente.

Modifiche del comportamento

Spark Connect presenta un numero ridotto di differenze di comportamento rispetto al runtime PySpark classico. Per informazioni di riferimento complete, vedere Spark Connect e Spark classico . L'analisi compatibilità rileva questi modelli in anticipo e blocca l'abilitazione fino a quando non vengono risolti, in modo da poterli trovare e correggere prima di influire sui dati di produzione.

In una pipeline, le situazioni più comuni in cui il comportamento può differire è:

Costruzione di dataframe interleaved e mutazione della sessione

Quando una pipeline costruisce un dataframe, modifica lo stato della sessione Spark, ad esempio modifica il catalogo o lo schema predefinito, imposta una configurazione, sostituisce una visualizzazione temporanea o registra nuovamente una funzione definita dall'utente, quindi usa il dataframe:

  • Senza una versione dell'ambiente, il dataframe usa lo stato della sessione di pre-mutazione .
  • Con una versione dell'ambiente, il dataframe usa lo stato della sessione post-mutazione .

Per esempio:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Senza una versione dell'ambiente, mytable contiene [(1, "Original Row")]. Con una versione dell'ambiente, mytable contiene [(2, "Replaced Row")].

UDF che fanno riferimento allo stato di Python modificabile

Quando una funzione definita dall'utente fa riferimento a una variabile globale Python il cui valore cambia dopo la definizione della funzione definita dall'utente:

  • Senza una versione dell'ambiente, la funzione definita dall'utente usa il valore più recente della variabile.
  • Con una versione dell'ambiente, la funzione definita dall'utente usa il valore al momento della definizione della funzione definita dall'utente.

Per esempio:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Senza una versione dell'ambiente, my_mv contiene [("alex_b",)]. Con una versione dell'ambiente, my_mv contiene [("alex_a",)].

Se una pipeline si basa su uno dei due modelli, controllarla prima di abilitare una versione dell'ambiente.

Analisi compatibilità

L'analisi di compatibilità consente di trovare modelli di codice nella pipeline che produrrebbero risultati diversi in una versione dell'ambiente, prima di abilitare uno. L'analisi è esplicita. Quando l'analisi è abilitata in una pipeline:

  • Ogni esecuzione della pipeline genera un BehaviorChangeInSparkConnectWARN evento nel registro eventi della pipeline per ogni modello rilevato.
  • Non è possibile abilitare una versione dell'ambiente nella pipeline finché non vengono visualizzati tutti gli avvisi di compatibilità dell'aggiornamento precedente.

Se l'analisi non è abilitata, non vengono generati eventi e environment_version l'abilitazione non viene bloccata. Databricks consiglia di abilitare l'analisi e la risoluzione di eventuali modelli rilevati prima di abilitare una versione dell'ambiente nella pipeline.

Abilitare l'analisi in una pipeline

È possibile abilitare l'analisi di compatibilità aggiungendo la configurazione della pipeline È possibile aggiungere la pipelines.environmentVersion.enableCompatibilityScan configurazione tramite l'interfaccia utente dell'editor della pipeline o aggiungendo una voce al codice JSON di configurazione della pipeline.

Tramite l'interfaccia utente:

  1. Nell'editor della pipeline fare clic su Impostazioni.
  2. Trovare la sezione Configurazione nelle impostazioni della pipeline.
  3. Fare clic sull'icona Con il segno più.Aggiungere la configurazione.
  4. Immettere pipelines.environmentVersion.enableCompatibilityScan come chiave e true come valore.
  5. Salvare le impostazioni della pipeline.

Nel codice JSON della pipeline:

Aggiungere la voce seguente al configuration blocco :

"configuration": {
  "pipelines.environmentVersion.enableCompatibilityScan": "true"
}
  1. Abilitare l'analisi nella pipeline.
  2. Attivare un'esecuzione della pipeline.
  3. Eseguire una query sul registro eventi della pipeline per individuare BehaviorChangeInSparkConnectWARN gli eventi. Vedere Informazioni di riferimento sugli eventi di compatibilità per l'elenco completo di codici di problema, modelli di esempio e correzioni suggerite.
  4. Aggiornare il codice della pipeline per rimuovere i modelli rilevati ed eseguire di nuovo la pipeline fino a quando non vengono generati altri eventi.
  5. Aggiungere environment_version alla pipeline usando uno dei metodi in Abilitare una versione dell'ambiente in una pipeline.

Se si ritiene che un avviso di compatibilità sia un falso positivo e si voglia abilitare environment_version comunque, rimuovere la pipelines.environmentVersion.enableCompatibilityScan voce dalla configurazione della pipeline per ignorare il controllo. L'impostazione del valore su false non è consentita. È necessario rimuovere completamente la voce.

Il controllo preliminare non viene eseguito sulle pipeline che non hanno alcun aggiornamento precedente o sulle pipeline che hanno già un set di versioni dell'ambiente.

Eseguire la migrazione di una pipeline esistente alle versioni dell'ambiente

Per eseguire la migrazione di una pipeline esistente che non usa ancora una versione dell'ambiente, seguire questo flusso di lavoro end-to-end. Illustra come trovare modelli di codice che possono comportarsi in modo diverso in Spark Connect, correggerli e implementare la versione dell'ambiente in modo sicuro.

  1. Abilitare l'analisi di compatibilità nella pipeline. Abilitare l'analisi sulla pipeline come descritto in Analisi compatibilità. Questo è ciò che causa la superficie dei modelli rilevati nel registro eventi e ciò che consente il controllo preliminare che protegge il tentativo di abilitazione.

  2. Attivare un'esecuzione della pipeline ed esaminare gli eventi di compatibilità. Attivare un normale aggiornamento della pipeline. Al termine, eseguire una query sul registro eventi della pipeline per individuare BehaviorChangeInSparkConnectWARN gli eventi. Ogni evento segnala un modello rilevato. Vedere Informazioni di riferimento sugli eventi di compatibilità per l'elenco completo di codici di problema, modelli di esempio e correzioni suggerite.

  3. Aggiornare il codice della pipeline per risolvere i modelli rilevati. Per ogni modello rilevato, aggiornare il codice della pipeline seguendo la correzione suggerita. Dopo ogni modifica, attivare un altro aggiornamento della pipeline e verificare che gli eventi corrispondenti non vengano più visualizzati. Ripetere fino a quando il registro eventi non presenta più eventi di compatibilità per un aggiornamento riuscito.

  4. Abilitare la versione dell'ambiente nella pipeline. Dopo che l'aggiornamento con esito positivo più recente non ha eventi di compatibilità, aggiungere environment_version alla pipeline usando l'interfaccia utente, l'API o il bundle, come descritto in Abilitare una versione dell'ambiente in una pipeline. L'aggiornamento successivo viene eseguito con Spark Connect e con la versione del linguaggio aggiunta Python e le librerie preinstallate.

    Se l'aggiornamento non riesce perché esistono ancora avvisi di compatibilità, eliminare environment_version, tornare al passaggio 2 e risolvere gli avvisi rimanenti prima di riprovare.

  5. Verificare la migrazione. Al termine del primo aggiornamento con la versione dell'ambiente, verificare:

    • L'evento create_update nel registro eventi mostra environment_version impostato sul valore previsto.
    • La pipeline produce i dati previsti e non vengono visualizzati nuovi eventi di errore.
    • Tabelle downstream di controllo spot per eventuali differenze di comportamento sottili descritte in Modifiche del comportamento.

Rollback

Se la pipeline non funziona correttamente dopo la migrazione, rimuovere dalle environment_version impostazioni della pipeline. L'aggiornamento successivo viene eseguito con la configurazione di runtime Python precedente. Usare l'esecuzione di cui è stato eseguito il rollback per eseguire il debug, quindi ripetere la migrazione dal passaggio 2 dopo aver identificato e risolto il problema.

Informazioni di riferimento sugli eventi di compatibilità

Quando l'analisi di compatibilità è abilitata in una pipeline, SDP genera un BehaviorChangeInSparkConnectWARN evento nel registro eventi della pipeline per ogni modello rilevato. Quando l'analisi è abilitata e l'aggiornamento con esito positivo precedente ha rilevato modelli, SDP blocca environment_version anche l'abilitazione fino a quando non vengono risolti i modelli.

Ogni evento segnala un singolo codice di problema che identifica ciò che è stato rilevato. Per cercare un codice, trovarlo nella tabella Codici di problema: ogni riga è collegata alla sezione categoria che contiene un modello di esempio e la correzione suggerita.

Forma evento

BehaviorChangeInSparkConnect gli eventi seguono lo schema del registro eventi della pipeline standard:

  • event_type è behavior_change_in_spark_connect.
  • level è WARN.
  • details contiene l'oggetto behavior_change_in_spark_connect , che ha un singolo issue campo. Il valore del problema è uno dei codici elencati di seguito.
  • message è una descrizione leggibile del modello rilevato.

Codici di problema

Categoria Codice del problema Description
Mutazioni del database e del catalogo USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Il catalogo predefinito è stato modificato dopo la creazione di un dataframe. Il dataframe esistente può risolvere le tabelle usando il nuovo catalogo predefinito.
Mutazioni del database e del catalogo USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE CATALOG è stato chiamato all'esterno di una funzione decorata da un decorator pipeline. Il catalogo predefinito può cambiare in modo imprevisto per le operazioni successive.
Mutazioni del database e del catalogo USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Il database predefinito è stato modificato dopo la creazione di un dataframe. Il dataframe esistente può risolvere le tabelle usando il nuovo database predefinito.
Mutazioni del database e del catalogo USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE DATABASE è stato chiamato all'esterno di una funzione decorata da un decorator pipeline. Il database predefinito può cambiare in modo imprevisto per le operazioni successive.
Esecuzione eager all'interno di funzioni di flusso CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione flow chiama un comando checkpoint.
Esecuzione eager all'interno di funzioni di flusso CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso crea in modo eager una vista DataFrame (createOrReplaceTempView o simile).
Esecuzione eager all'interno di funzioni di flusso CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione flow crea un profilo di risorsa.
Esecuzione eager all'interno di funzioni di flusso GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso chiama spark.resources o un'API risorsa correlata.
Esecuzione eager all'interno di funzioni di flusso MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso esegue un eager MERGE INTO in una tabella di destinazione.
Esecuzione eager all'interno di funzioni di flusso ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione flow esegue un'operazione spark ML eager.
Esecuzione eager all'interno di funzioni di flusso REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso registra un'origine dati Python.
Esecuzione eager all'interno di funzioni di flusso STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso opera su un handle di query di streaming attivo.
Esecuzione eager all'interno di funzioni di flusso STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso registra o rimuove un listener di query di streaming.
Esecuzione eager all'interno di funzioni di flusso STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso chiama spark.streams per gestire le query di streaming.
Esecuzione eager all'interno di funzioni di flusso WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione flusso esegue un'operazione eager DataFrameWriterV2 .
Esecuzione eager all'interno di funzioni di flusso WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione flusso esegue un'operazione eager DataFrame.write .
Esecuzione eager all'interno di funzioni di flusso WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED La funzione di flusso avvia una query di streaming (writeStream.start()).
Mutazioni della configurazione di Spark CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED spark.conf.set() o spark.conf.unset() è stato chiamato all'interno di una funzione decorata da un decorator di pipeline. Questa opzione non è supportata con una versione dell'ambiente.
Mutazioni della configurazione di Spark SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.set() è stato chiamato all'esterno di una funzione decorata da un elemento decorator di pipeline dopo la creazione di un dataframe. La modifica della configurazione può influire sul dataframe esistente in fase di esecuzione.
Mutazioni della configurazione di Spark UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.unset() è stato chiamato all'esterno di una funzione decorata da un elemento decorator di pipeline dopo la creazione di un dataframe. La modifica della configurazione può influire sul dataframe esistente in fase di esecuzione.
Sostituzioni di viste temporanee REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Una visualizzazione temporanea globale è stata sostituita dopo la creazione di un dataframe che lo fa riferimento. La sostituzione può essere riflessa nel dataframe esistente.
Sostituzioni di viste temporanee REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Una visualizzazione temporanea è stata sostituita dopo la creazione di un dataframe che lo fa riferimento. La sostituzione può essere riflessa nel dataframe esistente.
Mutazioni UDF e UDFTF OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Una funzione definita dall'utente è stata riregistrata con lo stesso nome dopo la creazione di un dataframe che lo fa riferimento. Il dataframe esistente può usare la nuova definizione di funzione definita dall'utente.
Mutazioni UDF e UDFTF OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Un tipo definito dall'utente è stato registrato nuovamente con lo stesso nome dopo la creazione di un dataframe che lo fa riferimento. Il dataframe esistente può usare la nuova definizione UDTF.
Mutazioni UDF e UDFTF UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR Una funzione definita dall'utente fa riferimento a una variabile di Python modificabile globale. Con una versione dell'ambiente, la funzione definita dall'utente usa il valore della variabile al momento della definizione della funzione definita dall'utente, non in fase di chiamata.
Mutazioni UDF e UDFTF UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR Un tipo definito dall'utente fa riferimento a una variabile modificabile globale Python. Con una versione dell'ambiente, UDTF usa il valore della variabile al momento della definizione del tipo definito dall'utente, non in fase di chiamata.

Mutazioni del database e del catalogo

Questi problemi vengono generati quando il codice della pipeline modifica il database o il catalogo predefinito. Con una versione dell'ambiente, i dataframe costruiti prima della mutazione possono risolvere le tabelle usando il nuovo database o il catalogo.

Modello di esempio che attiva un evento:

from pyspark import pipelines as dp

spark.sql("USE CATALOG marketing")
df = spark.read.table("events")

spark.sql("USE CATALOG sales")  # changes the default catalog after df was created

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Senza una versione dell'ambiente, df viene events risolto dal marketing catalogo. Con una versione dell'ambiente, df viene events risolto dal sales catalogo.

Correzione suggerita: Qualificare completamente i nomi di tabella in modo che la risoluzione non dipende dal catalogo o dal database predefinito ed evitare di modificare il catalogo predefinito o il database tra la creazione e l'uso del dataframe.

from pyspark import pipelines as dp

df = spark.read.table("marketing.default.events")

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Mutazioni della configurazione di Spark

Questi problemi vengono generati quando il codice della pipeline modifica la configurazione di Spark in modi che possono modificare il comportamento del dataframe in una versione dell'ambiente.

Modello di esempio che attiva un evento:

from pyspark import pipelines as dp

df = spark.read.table("events")

spark.conf.set("spark.sql.ansi.enabled", "true")  # changes session conf after df was created

@dp.materialized_view
def events_strict():
  return df.selectExpr("CAST(price AS INT) AS price")

Senza una versione dell'ambiente, il cast usa il valore conf in fase di creazione del dataframe. Con una versione dell'ambiente, il cast usa spark.sql.ansi.enabled=true e potrebbe non riuscire in input non valido.

Correzione suggerita: Impostare tutte le configurazioni Spark necessarie nella parte superiore del file della pipeline, prima che venga creato un dataframe. Per la configurazione per query, usare l'impostazione della configuration pipeline nella specifica della pipeline.

Sostituzioni di viste temporanee

Questi problemi vengono generati quando il codice della pipeline sostituisce una visualizzazione temporanea dopo la creazione di un dataframe che fa riferimento. Con una versione dell'ambiente, il dataframe esistente può riflettere il nuovo contenuto della visualizzazione.

Modello di esempio che attiva un evento:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Senza una versione dell'ambiente, mytable contiene [(1, "Original Row")]. Con una versione dell'ambiente, mytable contiene [(2, "Replaced Row")].

Correzione suggerita: Creare ogni visualizzazione temporanea una sola volta e non sostituirla. Se sono necessarie più visualizzazioni con dati correlati, assegnare a ognuno un nome distinto.

Mutazioni UDF e UDFTF

Questi problemi vengono generati quando il codice della pipeline modifica una funzione definita dall'utente o un tipo definito dall'utente in modi che modificano il comportamento in una versione dell'ambiente.

Modello di esempio che attiva un evento:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Senza una versione dell'ambiente, my_mv contiene [("alex_b",)]. Con una versione dell'ambiente, my_mv contiene [("alex_a",)].

Suggested fix: Passare i valori nella funzione definita dall'utente come argomenti anziché acquisire tali valori da Python globali o impostare il valore globale prima di definire la funzione definita dall'utente e non modificarla in seguito.

from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf

@udf
def append_suffix(s, suffix):
  return s + suffix

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))

Esecuzione eager all'interno di funzioni di flusso

Questi problemi vengono generati quando il codice della pipeline esegue un comando Spark eager all'interno di una funzione decorata da un decorator di pipeline (@table, @materialized_viewe così via). Le funzioni di flusso devono definire e restituire un dataframe; I comandi eager che scrivono dati, gestiscono query di streaming, registrano risorse o eseguono operazioni di Machine Learning non sono consentiti all'interno di una funzione di flusso con un set di versioni dell'ambiente.

Correzione suggerita: Spostare l'operazione eager all'esterno della funzione di flusso e restituire un dataframe dalla funzione di flusso. Gli effetti collaterali, ad esempio la scrittura in una tabella o l'avvio di una query di streaming, appartengono all'esterno della definizione della pipeline; il motore della pipeline gestisce la materializzazione del dataframe restituito dalla funzione di flusso.

Trovare gli eventi di compatibilità nel registro eventi

La query seguente restituisce tutti gli eventi di compatibilità per una pipeline, ordinati prima di tutto:

SELECT
  timestamp,
  message,
  details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
ORDER BY timestamp DESC;

Per contare gli eventi in base al codice del problema negli aggiornamenti recenti:

SELECT
  details:behavior_change_in_spark_connect:issue AS issue,
  COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;

Per informazioni su come eseguire query sul registro eventi, vedere Eseguire query nel registro eventi.

Vedere anche