Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Le versioni dell'ambiente per SDP sono in versione beta.
Le pipeline con una versione environment impostare l'esecuzione del codice Python tramite Spark Connect. Questa pagina illustra cosa non è compatibile, cosa si comporta in modo diverso, come analizzare una pipeline per individuare i modelli interessati e come eseguire la migrazione di una pipeline esistente.
Limitations
Le versioni dell'ambiente non sono ancora compatibili con tutte le funzionalità della pipeline. Un'esecuzione della pipeline con un set di versioni dell'ambiente ha esito negativo se il codice Python della pipeline esegue una delle operazioni seguenti:
- Modifica lo stato della sessione Spark all'interno di una funzione decorata con un decorator di pipeline. Gli esempi includono
spark.conf.set(...),spark.sql("USE CATALOG ...")ecreateOrReplaceTempView. - Usa le API PySpark non disponibili in Spark Connect, tra cui
SparkContext,RDDSQLContext, e qualsiasi API Py4J. Vedere Informazioni supportate in Spark Connect.
Se l'abilitazione di una versione dell'ambiente in una pipeline causa un errore, la disabilitazione della versione dell'ambiente restituisce la pipeline allo stato precedente.
Modifiche del comportamento
Spark Connect presenta un numero ridotto di differenze di comportamento rispetto al runtime PySpark classico. Per informazioni di riferimento complete, vedere Spark Connect e Spark classico . L'analisi compatibilità rileva questi modelli in anticipo e blocca l'abilitazione fino a quando non vengono risolti, in modo da poterli trovare e correggere prima di influire sui dati di produzione.
In una pipeline, le situazioni più comuni in cui il comportamento può differire è:
- Costruzione di dataframe interleaved e mutazione della sessione
- UDF che fanno riferimento allo stato modificabile Python
Costruzione di dataframe interleaved e mutazione della sessione
Quando una pipeline costruisce un dataframe, modifica lo stato della sessione Spark, ad esempio modifica il catalogo o lo schema predefinito, imposta una configurazione, sostituisce una visualizzazione temporanea o registra nuovamente una funzione definita dall'utente, quindi usa il dataframe:
- Senza una versione dell'ambiente, il dataframe usa lo stato della sessione di pre-mutazione .
- Con una versione dell'ambiente, il dataframe usa lo stato della sessione post-mutazione .
Per esempio:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Senza una versione dell'ambiente, mytable contiene [(1, "Original Row")]. Con una versione dell'ambiente, mytable contiene [(2, "Replaced Row")].
UDF che fanno riferimento allo stato di Python modificabile
Quando una funzione definita dall'utente fa riferimento a una variabile globale Python il cui valore cambia dopo la definizione della funzione definita dall'utente:
- Senza una versione dell'ambiente, la funzione definita dall'utente usa il valore più recente della variabile.
- Con una versione dell'ambiente, la funzione definita dall'utente usa il valore al momento della definizione della funzione definita dall'utente.
Per esempio:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Senza una versione dell'ambiente, my_mv contiene [("alex_b",)]. Con una versione dell'ambiente, my_mv contiene [("alex_a",)].
Se una pipeline si basa su uno dei due modelli, controllarla prima di abilitare una versione dell'ambiente.
Analisi compatibilità
L'analisi di compatibilità consente di trovare modelli di codice nella pipeline che produrrebbero risultati diversi in una versione dell'ambiente, prima di abilitare uno. L'analisi è esplicita. Quando l'analisi è abilitata in una pipeline:
- Ogni esecuzione della pipeline genera un
BehaviorChangeInSparkConnectWARNevento nel registro eventi della pipeline per ogni modello rilevato. - Non è possibile abilitare una versione dell'ambiente nella pipeline finché non vengono visualizzati tutti gli avvisi di compatibilità dell'aggiornamento precedente.
Se l'analisi non è abilitata, non vengono generati eventi e environment_version l'abilitazione non viene bloccata. Databricks consiglia di abilitare l'analisi e la risoluzione di eventuali modelli rilevati prima di abilitare una versione dell'ambiente nella pipeline.
Abilitare l'analisi in una pipeline
È possibile abilitare l'analisi di compatibilità aggiungendo la configurazione della pipeline È possibile aggiungere la pipelines.environmentVersion.enableCompatibilityScan configurazione tramite l'interfaccia utente dell'editor della pipeline o aggiungendo una voce al codice JSON di configurazione della pipeline.
Tramite l'interfaccia utente:
- Nell'editor della pipeline fare clic su Impostazioni.
- Trovare la sezione Configurazione nelle impostazioni della pipeline.
- Fare clic
Aggiungere la configurazione.
- Immettere
pipelines.environmentVersion.enableCompatibilityScancome chiave etruecome valore. - Salvare le impostazioni della pipeline.
Nel codice JSON della pipeline:
Aggiungere la voce seguente al configuration blocco :
"configuration": {
"pipelines.environmentVersion.enableCompatibilityScan": "true"
}
Flusso di lavoro consigliato
- Abilitare l'analisi nella pipeline.
- Attivare un'esecuzione della pipeline.
-
Eseguire una query sul registro eventi della pipeline per individuare
BehaviorChangeInSparkConnectWARNgli eventi. Vedere Informazioni di riferimento sugli eventi di compatibilità per l'elenco completo di codici di problema, modelli di esempio e correzioni suggerite. - Aggiornare il codice della pipeline per rimuovere i modelli rilevati ed eseguire di nuovo la pipeline fino a quando non vengono generati altri eventi.
- Aggiungere
environment_versionalla pipeline usando uno dei metodi in Abilitare una versione dell'ambiente in una pipeline.
Se si ritiene che un avviso di compatibilità sia un falso positivo e si voglia abilitare environment_version comunque, rimuovere la pipelines.environmentVersion.enableCompatibilityScan voce dalla configurazione della pipeline per ignorare il controllo. L'impostazione del valore su false non è consentita. È necessario rimuovere completamente la voce.
Il controllo preliminare non viene eseguito sulle pipeline che non hanno alcun aggiornamento precedente o sulle pipeline che hanno già un set di versioni dell'ambiente.
Eseguire la migrazione di una pipeline esistente alle versioni dell'ambiente
Per eseguire la migrazione di una pipeline esistente che non usa ancora una versione dell'ambiente, seguire questo flusso di lavoro end-to-end. Illustra come trovare modelli di codice che possono comportarsi in modo diverso in Spark Connect, correggerli e implementare la versione dell'ambiente in modo sicuro.
Abilitare l'analisi di compatibilità nella pipeline. Abilitare l'analisi sulla pipeline come descritto in Analisi compatibilità. Questo è ciò che causa la superficie dei modelli rilevati nel registro eventi e ciò che consente il controllo preliminare che protegge il tentativo di abilitazione.
Attivare un'esecuzione della pipeline ed esaminare gli eventi di compatibilità. Attivare un normale aggiornamento della pipeline. Al termine, eseguire una query sul registro eventi della pipeline per individuare
BehaviorChangeInSparkConnectWARNgli eventi. Ogni evento segnala un modello rilevato. Vedere Informazioni di riferimento sugli eventi di compatibilità per l'elenco completo di codici di problema, modelli di esempio e correzioni suggerite.Aggiornare il codice della pipeline per risolvere i modelli rilevati. Per ogni modello rilevato, aggiornare il codice della pipeline seguendo la correzione suggerita. Dopo ogni modifica, attivare un altro aggiornamento della pipeline e verificare che gli eventi corrispondenti non vengano più visualizzati. Ripetere fino a quando il registro eventi non presenta più eventi di compatibilità per un aggiornamento riuscito.
Abilitare la versione dell'ambiente nella pipeline. Dopo che l'aggiornamento con esito positivo più recente non ha eventi di compatibilità, aggiungere
environment_versionalla pipeline usando l'interfaccia utente, l'API o il bundle, come descritto in Abilitare una versione dell'ambiente in una pipeline. L'aggiornamento successivo viene eseguito con Spark Connect e con la versione del linguaggio aggiunta Python e le librerie preinstallate.Se l'aggiornamento non riesce perché esistono ancora avvisi di compatibilità, eliminare
environment_version, tornare al passaggio 2 e risolvere gli avvisi rimanenti prima di riprovare.Verificare la migrazione. Al termine del primo aggiornamento con la versione dell'ambiente, verificare:
- L'evento
create_updatenel registro eventi mostraenvironment_versionimpostato sul valore previsto. - La pipeline produce i dati previsti e non vengono visualizzati nuovi eventi di errore.
- Tabelle downstream di controllo spot per eventuali differenze di comportamento sottili descritte in Modifiche del comportamento.
- L'evento
Rollback
Se la pipeline non funziona correttamente dopo la migrazione, rimuovere dalle environment_version impostazioni della pipeline. L'aggiornamento successivo viene eseguito con la configurazione di runtime Python precedente. Usare l'esecuzione di cui è stato eseguito il rollback per eseguire il debug, quindi ripetere la migrazione dal passaggio 2 dopo aver identificato e risolto il problema.
Informazioni di riferimento sugli eventi di compatibilità
Quando l'analisi di compatibilità è abilitata in una pipeline, SDP genera un BehaviorChangeInSparkConnectWARN evento nel registro eventi della pipeline per ogni modello rilevato. Quando l'analisi è abilitata e l'aggiornamento con esito positivo precedente ha rilevato modelli, SDP blocca environment_version anche l'abilitazione fino a quando non vengono risolti i modelli.
Ogni evento segnala un singolo codice di problema che identifica ciò che è stato rilevato. Per cercare un codice, trovarlo nella tabella Codici di problema: ogni riga è collegata alla sezione categoria che contiene un modello di esempio e la correzione suggerita.
Forma evento
BehaviorChangeInSparkConnect gli eventi seguono lo schema del registro eventi della pipeline standard:
-
event_typeèbehavior_change_in_spark_connect. -
levelèWARN. -
detailscontiene l'oggettobehavior_change_in_spark_connect, che ha un singoloissuecampo. Il valore del problema è uno dei codici elencati di seguito. -
messageè una descrizione leggibile del modello rilevato.
Codici di problema
| Categoria | Codice del problema | Description |
|---|---|---|
| Mutazioni del database e del catalogo | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Il catalogo predefinito è stato modificato dopo la creazione di un dataframe. Il dataframe esistente può risolvere le tabelle usando il nuovo catalogo predefinito. |
| Mutazioni del database e del catalogo | USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE CATALOG è stato chiamato all'esterno di una funzione decorata da un decorator pipeline. Il catalogo predefinito può cambiare in modo imprevisto per le operazioni successive. |
| Mutazioni del database e del catalogo | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Il database predefinito è stato modificato dopo la creazione di un dataframe. Il dataframe esistente può risolvere le tabelle usando il nuovo database predefinito. |
| Mutazioni del database e del catalogo | USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR |
USE DATABASE è stato chiamato all'esterno di una funzione decorata da un decorator pipeline. Il database predefinito può cambiare in modo imprevisto per le operazioni successive. |
| Esecuzione eager all'interno di funzioni di flusso | CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione flow chiama un comando checkpoint. |
| Esecuzione eager all'interno di funzioni di flusso | CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso crea in modo eager una vista DataFrame (createOrReplaceTempView o simile). |
| Esecuzione eager all'interno di funzioni di flusso | CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione flow crea un profilo di risorsa. |
| Esecuzione eager all'interno di funzioni di flusso | GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso chiama spark.resources o un'API risorsa correlata. |
| Esecuzione eager all'interno di funzioni di flusso | MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso esegue un eager MERGE INTO in una tabella di destinazione. |
| Esecuzione eager all'interno di funzioni di flusso | ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione flow esegue un'operazione spark ML eager. |
| Esecuzione eager all'interno di funzioni di flusso | REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso registra un'origine dati Python. |
| Esecuzione eager all'interno di funzioni di flusso | STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso opera su un handle di query di streaming attivo. |
| Esecuzione eager all'interno di funzioni di flusso | STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso registra o rimuove un listener di query di streaming. |
| Esecuzione eager all'interno di funzioni di flusso | STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso chiama spark.streams per gestire le query di streaming. |
| Esecuzione eager all'interno di funzioni di flusso | WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione flusso esegue un'operazione eager DataFrameWriterV2 . |
| Esecuzione eager all'interno di funzioni di flusso | WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione flusso esegue un'operazione eager DataFrame.write . |
| Esecuzione eager all'interno di funzioni di flusso | WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED |
La funzione di flusso avvia una query di streaming (writeStream.start()). |
| Mutazioni della configurazione di Spark | CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED |
spark.conf.set() o spark.conf.unset() è stato chiamato all'interno di una funzione decorata da un decorator di pipeline. Questa opzione non è supportata con una versione dell'ambiente. |
| Mutazioni della configurazione di Spark | SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.set() è stato chiamato all'esterno di una funzione decorata da un elemento decorator di pipeline dopo la creazione di un dataframe. La modifica della configurazione può influire sul dataframe esistente in fase di esecuzione. |
| Mutazioni della configurazione di Spark | UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
spark.conf.unset() è stato chiamato all'esterno di una funzione decorata da un elemento decorator di pipeline dopo la creazione di un dataframe. La modifica della configurazione può influire sul dataframe esistente in fase di esecuzione. |
| Sostituzioni di viste temporanee | REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una visualizzazione temporanea globale è stata sostituita dopo la creazione di un dataframe che lo fa riferimento. La sostituzione può essere riflessa nel dataframe esistente. |
| Sostituzioni di viste temporanee | REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una visualizzazione temporanea è stata sostituita dopo la creazione di un dataframe che lo fa riferimento. La sostituzione può essere riflessa nel dataframe esistente. |
| Mutazioni UDF e UDFTF | OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Una funzione definita dall'utente è stata riregistrata con lo stesso nome dopo la creazione di un dataframe che lo fa riferimento. Il dataframe esistente può usare la nuova definizione di funzione definita dall'utente. |
| Mutazioni UDF e UDFTF | OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR |
Un tipo definito dall'utente è stato registrato nuovamente con lo stesso nome dopo la creazione di un dataframe che lo fa riferimento. Il dataframe esistente può usare la nuova definizione UDTF. |
| Mutazioni UDF e UDFTF | UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Una funzione definita dall'utente fa riferimento a una variabile di Python modificabile globale. Con una versione dell'ambiente, la funzione definita dall'utente usa il valore della variabile al momento della definizione della funzione definita dall'utente, non in fase di chiamata. |
| Mutazioni UDF e UDFTF | UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR |
Un tipo definito dall'utente fa riferimento a una variabile modificabile globale Python. Con una versione dell'ambiente, UDTF usa il valore della variabile al momento della definizione del tipo definito dall'utente, non in fase di chiamata. |
Mutazioni del database e del catalogo
Questi problemi vengono generati quando il codice della pipeline modifica il database o il catalogo predefinito. Con una versione dell'ambiente, i dataframe costruiti prima della mutazione possono risolvere le tabelle usando il nuovo database o il catalogo.
Modello di esempio che attiva un evento:
from pyspark import pipelines as dp
spark.sql("USE CATALOG marketing")
df = spark.read.table("events")
spark.sql("USE CATALOG sales") # changes the default catalog after df was created
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Senza una versione dell'ambiente, df viene events risolto dal marketing catalogo. Con una versione dell'ambiente, df viene events risolto dal sales catalogo.
Correzione suggerita: Qualificare completamente i nomi di tabella in modo che la risoluzione non dipende dal catalogo o dal database predefinito ed evitare di modificare il catalogo predefinito o il database tra la creazione e l'uso del dataframe.
from pyspark import pipelines as dp
df = spark.read.table("marketing.default.events")
@dp.materialized_view
def events_summary():
return df.groupBy("region").count()
Mutazioni della configurazione di Spark
Questi problemi vengono generati quando il codice della pipeline modifica la configurazione di Spark in modi che possono modificare il comportamento del dataframe in una versione dell'ambiente.
Modello di esempio che attiva un evento:
from pyspark import pipelines as dp
df = spark.read.table("events")
spark.conf.set("spark.sql.ansi.enabled", "true") # changes session conf after df was created
@dp.materialized_view
def events_strict():
return df.selectExpr("CAST(price AS INT) AS price")
Senza una versione dell'ambiente, il cast usa il valore conf in fase di creazione del dataframe. Con una versione dell'ambiente, il cast usa spark.sql.ansi.enabled=true e potrebbe non riuscire in input non valido.
Correzione suggerita: Impostare tutte le configurazioni Spark necessarie nella parte superiore del file della pipeline, prima che venga creato un dataframe. Per la configurazione per query, usare l'impostazione della configuration pipeline nella specifica della pipeline.
Sostituzioni di viste temporanee
Questi problemi vengono generati quando il codice della pipeline sostituisce una visualizzazione temporanea dopo la creazione di un dataframe che fa riferimento. Con una versione dell'ambiente, il dataframe esistente può riflettere il nuovo contenuto della visualizzazione.
Modello di esempio che attiva un evento:
from pyspark import pipelines as dp
spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
df = spark.sql("SELECT * FROM my_view")
spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
.createOrReplaceTempView("my_view")
@dp.materialized_view
def mytable():
return df
Senza una versione dell'ambiente, mytable contiene [(1, "Original Row")]. Con una versione dell'ambiente, mytable contiene [(2, "Replaced Row")].
Correzione suggerita: Creare ogni visualizzazione temporanea una sola volta e non sostituirla. Se sono necessarie più visualizzazioni con dati correlati, assegnare a ognuno un nome distinto.
Mutazioni UDF e UDFTF
Questi problemi vengono generati quando il codice della pipeline modifica una funzione definita dall'utente o un tipo definito dall'utente in modi che modificano il comportamento in una versione dell'ambiente.
Modello di esempio che attiva un evento:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf
suffix = "a"
@udf
def my_udf(s):
return s + suffix
suffix = "b"
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))
Senza una versione dell'ambiente, my_mv contiene [("alex_b",)]. Con una versione dell'ambiente, my_mv contiene [("alex_a",)].
Suggested fix: Passare i valori nella funzione definita dall'utente come argomenti anziché acquisire tali valori da Python globali o impostare il valore globale prima di definire la funzione definita dall'utente e non modificarla in seguito.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf
@udf
def append_suffix(s, suffix):
return s + suffix
@dp.materialized_view
def my_mv():
return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))
Esecuzione eager all'interno di funzioni di flusso
Questi problemi vengono generati quando il codice della pipeline esegue un comando Spark eager all'interno di una funzione decorata da un decorator di pipeline (@table, @materialized_viewe così via). Le funzioni di flusso devono definire e restituire un dataframe; I comandi eager che scrivono dati, gestiscono query di streaming, registrano risorse o eseguono operazioni di Machine Learning non sono consentiti all'interno di una funzione di flusso con un set di versioni dell'ambiente.
Correzione suggerita: Spostare l'operazione eager all'esterno della funzione di flusso e restituire un dataframe dalla funzione di flusso. Gli effetti collaterali, ad esempio la scrittura in una tabella o l'avvio di una query di streaming, appartengono all'esterno della definizione della pipeline; il motore della pipeline gestisce la materializzazione del dataframe restituito dalla funzione di flusso.
Trovare gli eventi di compatibilità nel registro eventi
La query seguente restituisce tutti gli eventi di compatibilità per una pipeline, ordinati prima di tutto:
SELECT
timestamp,
message,
details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
ORDER BY timestamp DESC;
Per contare gli eventi in base al codice del problema negli aggiornamenti recenti:
SELECT
details:behavior_change_in_spark_connect:issue AS issue,
COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;
Per informazioni su come eseguire query sul registro eventi, vedere Eseguire query nel registro eventi.
Vedere anche
- Configurare le versioni dell'ambiente per le pipeline : panoramica delle funzionalità, come abilitare una versione dell'ambiente.
- Schema del registro eventi della pipeline: schema completo del registro eventi della pipeline.
- Log eventi della pipeline: come eseguire query sul registro eventi della pipeline.