Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Le pipeline di Auto Loader richiedono un monitoraggio attivo per rilevare problemi come backlog crescenti, deriva dello schema, dati corrotti e flussi bloccati prima che influiscano sui sistemi downstream. Questa pagina descrive come monitorare le metriche chiave, eseguire query sullo stato a livello di file, creare dashboard di osservabilità e risolvere i problemi comuni.
Per informazioni dettagliate sulla configurazione di produzione, vedere Configurare il caricatore automatico per i carichi di lavoro di produzione.
Prerequisiti
Diversi flussi di monitoraggio in questa pagina si basano su cloud_files_state() per monitorare lo stato di acquisizione dei singoli file, incluse le query sul backlog, i calcoli della latenza e il rilevamento delle variazioni dello schema.
cloud_files_state() è una funzione con valori di tabella che restituisce lo stato di inserimento a livello di file per un checkpoint del caricatore automatico. Non tutti i relativi campi sono disponibili per impostazione predefinita. La disponibilità dipende dalla versione e dalla configurazione di Databricks Runtime:
-
Databricks Runtime 18.2 e versioni successive:
discovery_time,processed_timeecommit_timesono disponibili automaticamente. In Databricks Runtime 16.4–18.1 questi campi sono disponibili solo quandocloudFiles.cleanSourceè abilitato. -
Databricks Runtime 16.4 e versioni successive con
cloudFiles.cleanSourceabilitato:archive_time,archive_modeemove_locationsono disponibili.
L'abilitazione di cloudFiles.cleanSource comporta un certo sovraccarico in termini di prestazioni. Valuta le prestazioni dei tuoi carichi di lavoro in un ambiente di pre-produzione prima di abilitarlo in produzione.
Inoltre:
- Annotare i dati inseriti con la
_metadatacolonna . Acquisire almenofile_pathefile_modification_time. Vedere Colonna dei metadati dei file. - Abilitare
_rescued_datae_corrupt_recordcolonne.
Metriche chiave del caricatore automatico
La tabella seguente riepiloga le metriche più importanti da monitorare per le pipeline del caricatore automatico. Queste metriche sono disponibili dagli eventi di avanzamento StreamingQueryListener, con valori specifici di Auto Loader esposti nella mappa metrics di ogni origine.
| Metrica | Cosa ti dice |
|---|---|
numFilesOutstanding |
Numero di file nel backlog in attesa di elaborazione |
numBytesOutstanding |
Dimensione della coda di file in byte |
approximateQueueSize |
Profondità della coda cloud (solo in modalità notifica file) |
numInputRows |
Righe elaborate per ogni batch |
inputRowsPerSecond |
Tariffa di arrivo dei dati |
processedRowsPerSecond |
Capacità di elaborazione |
durationMs Crollo |
Dove viene impiegato il tempo in ogni batch |
A cosa prestare attenzione
Gli schemi che seguono indicano che la pipeline potrebbe richiedere attenzione.
-
In aumento
numFilesOutstanding: il backlog si sta accumulando. La pipeline non riesce a tenere il passo con i dati in ingresso. -
processedRowsPerSecond<inputRowsPerSecond: La pipeline sta elaborando i dati più lentamente di quanto arrivino. -
Large
durationMs.latestOffset: l'individuazione dei file è lenta. Valutare il passaggio agli eventi file. -
Large
durationMs.addBatch: l'elaborazione dei dati è lenta. Valutare la possibilità di ridimensionare le risorse di calcolo o ottimizzare le trasformazioni.
Per informazioni di riferimento sulle metriche complete, vedere Metriche di origine del caricatore automatico.
Eseguire query sullo stato a livello di file con cloud_files_state
La cloud_files_state() funzione con valori di tabella fornisce informazioni dettagliate su ogni file individuato dal caricatore automatico. Sono disponibili i campi seguenti. I campi contrassegnati come che richiedono Databricks Runtime 16.4 e versioni successive o 18.2 e versioni successive vengono popolati solo nelle condizioni descritte in Prerequisiti.
| Campo | TIPO | Descrzione |
|---|---|---|
path |
STRING |
Percorso del file |
size |
BIGINT |
Dimensioni del file in byte |
create_time |
TIMESTAMP |
Quando è stato creato il file |
discovery_time |
TIMESTAMP |
Quando il caricatore automatico ha individuato il file (Databricks Runtime 16.4 e versioni successive) |
processed_time |
TIMESTAMP |
Quando il caricatore automatico ha elaborato il file (Databricks Runtime 16.4 e versioni successive) |
commit_time |
TIMESTAMP |
Quando è stato effettuato il commit del file nel checkpoint (Databricks Runtime 16.4 e versioni successive) |
archive_time |
TIMESTAMP |
Quando il file è stato archiviato (richiede cloudFiles.cleanSource) |
archive_mode |
STRING |
MOVE, DELETE, o NULL (richiede cloudFiles.cleanSource) |
move_location |
STRING |
Percorso di destinazione quando cloudFiles.cleanSource è MOVE |
ingestion_state |
STRING |
Stato di inserimento file corrente |
Verifica lo stato di acquisizione del file
Le query seguenti riguardano scenari di diagnostica comuni.
Trovare tutti i file non elaborati (il backlog corrente):
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';
Calcola la latenza media di acquisizione (tempo dalla creazione del file al commit):
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;
Trovare file danneggiati o ignorati:
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';
Tenere traccia dello stato di avanzamento dell'archiviazione (richiede cloudFiles.cleanSource):
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;
Trova i file con elevata latenza dall'individuazione al commit per identificare i colli di bottiglia:
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;
Per informazioni di riferimento complete su SQL, vedere cloud_files_state funzione con valori di tabella.
Monitorare il caricatore automatico nelle pipeline dichiarative di Lakeflow Spark
Databricks consiglia di utilizzare le pipeline dichiarative Spark di Lakeflow per le pipeline Auto Loader in produzione. Per sfruttare le funzionalità di monitoraggio predefinite:
Archiviare il log eventi di Lakeflow Spark Declarative Pipelines in una tabella Delta in modo che possa essere eseguita una query per ottenere dati di osservabilità. Configurare questa operazione tramite le impostazioni avanzate della pipeline o l'API. Per informazioni dettagliate, vedere Registro eventi della pipeline.
Strutturare la pipeline per l'osservabilità. Una pipeline di caricamento automatico ben strutturata nelle pipeline dichiarative di Lakeflow Spark include una
{table}_sourcevista (definizione di origine del caricatore automatico), una{table}_bronzetabella di streaming (inserimento di dati non elaborati con_rescued_datae_corrupt_recordcolonne), unacorrupt_records_sinkche mette in quarantena le righe con dati non analizzabili e una{table}visualizzazione pulita per l'utilizzo downstream.Impostare le aspettative sulle tabelle di streaming bronze per monitorare la deriva dello schema e il danneggiamento dei dati.
_rescued_data IS NULLrileva modifiche impreviste dello schema e_corrupt_record IS NULLrileva dati non analizzabili. Le pipeline dichiarative di Lakeflow Spark valutano queste aspettative man mano che arrivano i dati e generano un trail di osservabilità. È possibile configurare le aspettative per avvisare, eliminare righe o interrompere la pipeline.
Dopo aver creato la vista event_log_raw della pipeline, utilizza le query seguenti per le metriche specifiche di Auto Loader.
Monitorare la velocità effettiva di inserimento per flusso:
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;
Monitorare il backlog dei dati per flusso:
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;
Riepilogare le violazioni delle aspettative per rilevare la deriva dello schema e i dati danneggiati:
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;
Per indicazioni generali sul monitoraggio delle pipeline dichiarative di Lakeflow Spark, vedere Monitorare le pipeline e il log eventi della pipeline.
Monitorare il caricatore automatico con Structured Streaming
Quando esegui Auto Loader al di fuori di Lakeflow Declarative Pipelines per Spark, utilizza i seguenti approcci di monitoraggio di Structured Streaming.
- Implementa un
StreamingQueryListenerper acquisire le metriche specifiche di Auto Loader da ciascun batch leggendo dasource.metrics.
from pyspark.sql.streaming import StreamingQueryListener
class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)
def onQueryIdle(self, event):
pass
def onQueryTerminated(self, event):
pass
spark.streams.addListener(AutoLoaderMonitor())
Annotazioni
La logica di elaborazione nei listener può rallentare l'elaborazione delle query. Limitate il calcolo nei callback del listener ed evitate scritture esterne sincrone; inviate invece dati di telemetria leggeri in modo asincrono o delegate le metriche a un processo separato per la persistenza.
Usare
numInputRows,inputRowsPerSecondeprocessedRowsPerSeconddallo stato di avanzamento dell'origine per calcolare la velocità effettiva, ovvero i file al secondo e le righe al secondo per ogni batch.Per calcolare la latenza di ingestione, confrontare
create_timeecommit_timeincloud_files_state()per la latenza end-to-end. Per la latenza di elaborazione, utilizzare il dettagliodurationMs(ad esempio,latestOffset,addBatche altre fasi batch riportate) per identificare quale fase costituisce il collo di bottiglia.Usare
df.observe()per definire le metriche della qualità dei dati inline direttamente nel dataframe di streaming. Le metriche sono visibili negli eventi di avanzamento inStreamingQueryListenersottoobservedMetrics.
from pyspark.sql.functions import count, lit, col
observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
- Usare
.queryName()per assegnare un nome univoco a ogni flusso, semplificando la distinzione dei flussi del caricatore automatico nella scheda Streaming dell'interfaccia utente Spark e nei dashboard di monitoraggio.
Per il riferimento completo sul monitoraggio di Structured Streaming, vedere Monitoraggio delle query di Structured Streaming in Azure Databricks.
Creare un dashboard di osservabilità
Combina i dati provenienti da più fonti per creare una dashboard di osservabilità completa per le pipeline di Auto Loader. Questa tabella visualizza alcune origini suggerite che è possibile usare per strutturare il dashboard di osservabilità.
| L'origine dei dati | Dati di osservabilità |
|---|---|
cloud_files_state() |
Stato di inserimento a livello di file: individuazione, elaborazione, commit e timestamp di archiviazione per ogni file |
| Registro eventi di Lakeflow Spark Declarative Pipelines | Cronologia di esecuzione della pipeline, metriche del flusso per batch e risultati delle aspettative sulla qualità dei dati |
| Tabelle dei risultati della pipeline | Conteggi delle righe e volume di dati scritti per tabella inserita |
È quindi possibile aggregare i dati di osservabilità in tabelle dedicate che fungono da base per dashboard e avvisi:
- Riepilogare gli stati di esecuzione della pipeline (esito positivo o negativo) nel tempo, derivati dagli
event_type = 'update_progress'eventi. - Metriche aggregate di acquisizione dei file (dimensione del backlog, capacità effettiva, latenza per batch), ricavate dagli eventi
cloud_files_state()eevent_type = 'flow_progress'. - Sviluppare statistiche di tabella usando i conteggi delle righe e il volume di dati per ogni tabella, derivato da
num_output_rowsnel registro eventi. - Raccolta di informazioni di debug dai log dettagliati degli errori e dalle violazioni delle condizioni previste per ogni aggiornamento, derivate da eventi
event_type = 'flow_progress'condata_qualityvalorizzato.
Queste tabelle aggregate possono alimentare un dashboard di intelligenza artificiale/BI e avvisi SQL. I pannelli del dashboard consigliati includono sequenza temporale dello stato di esecuzione della pipeline, tendenza del backlog di inserimento, tendenza della velocità effettiva, distribuzione della latenza di inserimento, metriche di qualità dei dati, eventi di evoluzione dello schema e stato di archiviazione dei file.
Monitorare gli eventi di evoluzione dello schema
Usare gli approcci seguenti per rilevare le modifiche dello schema man mano che si verificano.
- I valori non NULL in
_rescued_datanei conteggi delle violazioni delle aspettative indicano una deriva dello schema. Interrogare il registro eventi perfailed_records > 0in base all'aspettativano rescued data. - Le modifiche alla
_schemasdirectory all'interno della directory configuratacloudFiles.schemaLocation(o all'interno del checkpoint solo quando il percorso dello schema non è impostato separatamente) indicano che si è verificata l'evoluzione dello schema. È possibile monitorare questa directory da un job di monitoraggio separato. - Non considerare un evento
onQueryTerminatedseguito daonQueryStartedper lo stesso nome di stream come prova sufficiente di per sé di un'evoluzione dello schema. I flussi vengono riavviati per molti motivi (riavvii del cluster, distribuzioni del codice, errori di archiviazione temporanei). Correlare i riavvii con segnali indipendenti —_schemasmodifiche alla directory o_rescued_dataviolazioni delle attese — prima di concludere che si sia verificata un'evoluzione dello schema. - Usare
_metadata.file_pathper identificare i file che hanno introdotto le modifiche dello schema. Unisci questo concloud_files_state()sul campopathper correlare le modifiche dello schema con file e batch specifici.
Usare questa query di esempio per rilevare la deriva recente dello schema tramite violazioni delle aspettative:
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;
Configurare gli avvisi per i problemi comuni
Usare gli avvisi o le notifiche della pipeline di Databricks SQL per rilevare i problemi prima che influiscano sui consumer downstream.
Il codice SQL seguente rileva un backlog in crescita e può essere usato come base per un avviso SQL di Databricks. Pianificala in modo che venga eseguita periodicamente (ad esempio, ogni 5 minuti) e invia un avviso se il risultato non è vuoto.
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB
La tabella seguente riepiloga le condizioni di avviso consigliate:
| Cosa rilevare | Come rilevarlo | Quando inviare un avviso |
|---|---|---|
| Backlog in crescita |
numFilesOutstanding tendenza verso l'alto |
Aumento costante in più batch |
| Flusso bloccato | Nessun evento di avanzamento | Nessun evento per N minuti (in base all'intervallo di trigger previsto) |
| Latenza di acquisizione elevata | commit_time - create_time |
Supera la soglia dell'SLA |
| Riduzione della qualità dei dati | Frequenza di errore delle aspettative | Percentuale crescente di righe che non soddisfano le aspettative |
| Evento di evoluzione dello schema | _rescued_data IS NOT NULL |
Qualsiasi valore non NULL nel conteggio delle violazioni previsto |
| Rilevamento dei file lento | durationMs.latestOffset |
Significativamente superiore alla linea di base |
Risolvere i problemi comuni
La tabella seguente descrive i problemi comuni relativi alla pipeline del caricatore automatico, le cause probabili e le azioni consigliate per risolverli.
| Issue | Possibile causa | Azione consigliata |
|---|---|---|
| Aumento del backlog più veloce rispetto all'elaborazione | Calcolo sottodimensionato, asimmetria dei dati o limiti di velocità limitata | Aumentare o ridurre le risorse di calcolo, verificare eventuali squilibri con la UI di Spark ed esaminare le impostazioni maxFilesPerTrigger per controllare la dimensione del batch |
| File non individuati | Eventi di file non configurati correttamente, problemi di autorizzazioni o flusso non eseguiti entro 7 giorni | Verifica le autorizzazioni per la posizione esterna, controlla la configurazione degli eventi dei file nell'interfaccia utente di Unity Catalog e assicurati che lo stream venga eseguito almeno ogni 7 giorni per evitare la scadenza dello stato in RocksDB |
| L'avvio del flusso richiede troppo tempo | Download di grandi dimensioni dello stato del checkpoint (RocksDB) | Eseguire l'aggiornamento a Databricks Runtime 15.3 e versioni successive per il caricamento dello stato asincrono, riducendo il tempo di avvio di circa 90% |
| Elaborazione di file duplicati | Impostazioni aggressive cloudFiles.maxFileAge o danneggiamento del checkpoint |
Usare un criterio conservativo maxFileAge (almeno 90 giorni), verificare l'integrità del checkpoint ed evitare i criteri relativi al ciclo di vita nell'archiviazione dei checkpoint |
| Evoluzione dello schema che provoca il riavvio della pipeline | Modifiche frequenti o incompatibili dello schema | Esaminare schemaEvolutionMode, passare a addNewColumnsWithTypeWidening per promozioni di tipo o usare il tipo Variant per schemi altamente dinamici |
| Dati danneggiati accumulati nel sink | Problemi di qualità dei dati di origine | Controllare il sink _corrupt_record di quarantena per individuare eventuali pattern, esaminare la modalità di generazione dei dati di origine e prendere in considerazione l'aggiunta della convalida a monte |
discovery_time e commit_time non popolati |
In esecuzione su Databricks Runtime versione precedente alla 18.2 senza cleanSource |
Eseguire l'aggiornamento a Databricks Runtime 18.2 e versioni successive o abilitare cloudFiles.cleanSource in Databricks Runtime dalla versione 16.4 alla 18.1 |
Per altre informazioni sulla risoluzione dei problemi, vedere Domande frequenti sul caricatore automatico.