Monitorare e osservare il caricatore automatico

Le pipeline di Auto Loader richiedono un monitoraggio attivo per rilevare problemi come backlog crescenti, deriva dello schema, dati corrotti e flussi bloccati prima che influiscano sui sistemi downstream. Questa pagina descrive come monitorare le metriche chiave, eseguire query sullo stato a livello di file, creare dashboard di osservabilità e risolvere i problemi comuni.

Per informazioni dettagliate sulla configurazione di produzione, vedere Configurare il caricatore automatico per i carichi di lavoro di produzione.

Prerequisiti

Diversi flussi di monitoraggio in questa pagina si basano su cloud_files_state() per monitorare lo stato di acquisizione dei singoli file, incluse le query sul backlog, i calcoli della latenza e il rilevamento delle variazioni dello schema. cloud_files_state() è una funzione con valori di tabella che restituisce lo stato di inserimento a livello di file per un checkpoint del caricatore automatico. Non tutti i relativi campi sono disponibili per impostazione predefinita. La disponibilità dipende dalla versione e dalla configurazione di Databricks Runtime:

  • Databricks Runtime 18.2 e versioni successive: discovery_time, processed_timee commit_time sono disponibili automaticamente. In Databricks Runtime 16.4–18.1 questi campi sono disponibili solo quando cloudFiles.cleanSource è abilitato.
  • Databricks Runtime 16.4 e versioni successive con cloudFiles.cleanSource abilitato: archive_time, archive_modee move_location sono disponibili.

L'abilitazione di cloudFiles.cleanSource comporta un certo sovraccarico in termini di prestazioni. Valuta le prestazioni dei tuoi carichi di lavoro in un ambiente di pre-produzione prima di abilitarlo in produzione.

Inoltre:

  • Annotare i dati inseriti con la _metadata colonna . Acquisire almeno file_path e file_modification_time. Vedere Colonna dei metadati dei file.
  • Abilitare _rescued_data e _corrupt_record colonne.

Metriche chiave del caricatore automatico

La tabella seguente riepiloga le metriche più importanti da monitorare per le pipeline del caricatore automatico. Queste metriche sono disponibili dagli eventi di avanzamento StreamingQueryListener, con valori specifici di Auto Loader esposti nella mappa metrics di ogni origine.

Metrica Cosa ti dice
numFilesOutstanding Numero di file nel backlog in attesa di elaborazione
numBytesOutstanding Dimensione della coda di file in byte
approximateQueueSize Profondità della coda cloud (solo in modalità notifica file)
numInputRows Righe elaborate per ogni batch
inputRowsPerSecond Tariffa di arrivo dei dati
processedRowsPerSecond Capacità di elaborazione
durationMs Crollo Dove viene impiegato il tempo in ogni batch

A cosa prestare attenzione

Gli schemi che seguono indicano che la pipeline potrebbe richiedere attenzione.

  • In aumento numFilesOutstanding: il backlog si sta accumulando. La pipeline non riesce a tenere il passo con i dati in ingresso.
  • processedRowsPerSecond < inputRowsPerSecond: La pipeline sta elaborando i dati più lentamente di quanto arrivino.
  • Large durationMs.latestOffset: l'individuazione dei file è lenta. Valutare il passaggio agli eventi file.
  • Large durationMs.addBatch: l'elaborazione dei dati è lenta. Valutare la possibilità di ridimensionare le risorse di calcolo o ottimizzare le trasformazioni.

Per informazioni di riferimento sulle metriche complete, vedere Metriche di origine del caricatore automatico.

Eseguire query sullo stato a livello di file con cloud_files_state

La cloud_files_state() funzione con valori di tabella fornisce informazioni dettagliate su ogni file individuato dal caricatore automatico. Sono disponibili i campi seguenti. I campi contrassegnati come che richiedono Databricks Runtime 16.4 e versioni successive o 18.2 e versioni successive vengono popolati solo nelle condizioni descritte in Prerequisiti.

Campo TIPO Descrzione
path STRING Percorso del file
size BIGINT Dimensioni del file in byte
create_time TIMESTAMP Quando è stato creato il file
discovery_time TIMESTAMP Quando il caricatore automatico ha individuato il file (Databricks Runtime 16.4 e versioni successive)
processed_time TIMESTAMP Quando il caricatore automatico ha elaborato il file (Databricks Runtime 16.4 e versioni successive)
commit_time TIMESTAMP Quando è stato effettuato il commit del file nel checkpoint (Databricks Runtime 16.4 e versioni successive)
archive_time TIMESTAMP Quando il file è stato archiviato (richiede cloudFiles.cleanSource)
archive_mode STRING MOVE, DELETE, o NULL (richiede cloudFiles.cleanSource)
move_location STRING Percorso di destinazione quando cloudFiles.cleanSource è MOVE
ingestion_state STRING Stato di inserimento file corrente

Verifica lo stato di acquisizione del file

Le query seguenti riguardano scenari di diagnostica comuni.

Trovare tutti i file non elaborati (il backlog corrente):

SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

Calcola la latenza media di acquisizione (tempo dalla creazione del file al commit):

SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

Trovare file danneggiati o ignorati:

SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

Tenere traccia dello stato di avanzamento dell'archiviazione (richiede cloudFiles.cleanSource):

SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

Trova i file con elevata latenza dall'individuazione al commit per identificare i colli di bottiglia:

SELECT
  path,
  size,
  unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
  unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

Per informazioni di riferimento complete su SQL, vedere cloud_files_state funzione con valori di tabella.

Monitorare il caricatore automatico nelle pipeline dichiarative di Lakeflow Spark

Databricks consiglia di utilizzare le pipeline dichiarative Spark di Lakeflow per le pipeline Auto Loader in produzione. Per sfruttare le funzionalità di monitoraggio predefinite:

  • Archiviare il log eventi di Lakeflow Spark Declarative Pipelines in una tabella Delta in modo che possa essere eseguita una query per ottenere dati di osservabilità. Configurare questa operazione tramite le impostazioni avanzate della pipeline o l'API. Per informazioni dettagliate, vedere Registro eventi della pipeline.

  • Strutturare la pipeline per l'osservabilità. Una pipeline di caricamento automatico ben strutturata nelle pipeline dichiarative di Lakeflow Spark include una {table}_source vista (definizione di origine del caricatore automatico), una {table}_bronze tabella di streaming (inserimento di dati non elaborati con _rescued_data e _corrupt_record colonne), una corrupt_records_sink che mette in quarantena le righe con dati non analizzabili e una {table} visualizzazione pulita per l'utilizzo downstream.

  • Impostare le aspettative sulle tabelle di streaming bronze per monitorare la deriva dello schema e il danneggiamento dei dati. _rescued_data IS NULL rileva modifiche impreviste dello schema e _corrupt_record IS NULL rileva dati non analizzabili. Le pipeline dichiarative di Lakeflow Spark valutano queste aspettative man mano che arrivano i dati e generano un trail di osservabilità. È possibile configurare le aspettative per avvisare, eliminare righe o interrompere la pipeline.

Dopo aver creato la vista event_log_raw della pipeline, utilizza le query seguenti per le metriche specifiche di Auto Loader.

Monitorare la velocità effettiva di inserimento per flusso:

SELECT
  origin.flow_name,
  origin.update_id,
  timestamp,
  TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

Monitorare il backlog dei dati per flusso:

SELECT
  origin.flow_name,
  timestamp,
  DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

Riepilogare le violazioni delle aspettative per rilevare la deriva dello schema e i dati danneggiati:

SELECT
  origin.flow_name,
  explode(from_json(
    details:flow_progress.data_quality.expectations,
    'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
  )) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
  AND details:flow_progress.data_quality.expectations IS NOT NULL;

Per indicazioni generali sul monitoraggio delle pipeline dichiarative di Lakeflow Spark, vedere Monitorare le pipeline e il log eventi della pipeline.

Monitorare il caricatore automatico con Structured Streaming

Quando esegui Auto Loader al di fuori di Lakeflow Declarative Pipelines per Spark, utilizza i seguenti approcci di monitoraggio di Structured Streaming.

  • Implementa un StreamingQueryListener per acquisire le metriche specifiche di Auto Loader da ciascun batch leggendo da source.metrics.
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
    def onQueryStarted(self, event):
        pass

    def onQueryProgress(self, event):
        for source in event.progress.sources:
            if "CloudFilesSource" in source.description:
                metrics = source.metrics
                files_outstanding = metrics.get("numFilesOutstanding", "0")
                bytes_outstanding = metrics.get("numBytesOutstanding", "0")
                rows_per_sec = source.processedRowsPerSecond
                # Push metrics to your monitoring system (for example, write to a Delta table)

    def onQueryIdle(self, event):
        pass

    def onQueryTerminated(self, event):
        pass

spark.streams.addListener(AutoLoaderMonitor())

Annotazioni

La logica di elaborazione nei listener può rallentare l'elaborazione delle query. Limitate il calcolo nei callback del listener ed evitate scritture esterne sincrone; inviate invece dati di telemetria leggeri in modo asincrono o delegate le metriche a un processo separato per la persistenza.

  • Usare numInputRows, inputRowsPerSeconde processedRowsPerSecond dallo stato di avanzamento dell'origine per calcolare la velocità effettiva, ovvero i file al secondo e le righe al secondo per ogni batch.

  • Per calcolare la latenza di ingestione, confrontare create_time e commit_time in cloud_files_state() per la latenza end-to-end. Per la latenza di elaborazione, utilizzare il dettaglio durationMs (ad esempio, latestOffset, addBatch e altre fasi batch riportate) per identificare quale fase costituisce il collo di bottiglia.

  • Usare df.observe() per definire le metriche della qualità dei dati inline direttamente nel dataframe di streaming. Le metriche sono visibili negli eventi di avanzamento in StreamingQueryListener sotto observedMetrics.

from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
    "auto_loader_quality",
    count(lit(1)).alias("total_rows"),
    count(col("_rescued_data")).alias("rescued_rows"),
    count(col("_corrupt_record")).alias("corrupt_rows")
)
  • Usare .queryName() per assegnare un nome univoco a ogni flusso, semplificando la distinzione dei flussi del caricatore automatico nella scheda Streaming dell'interfaccia utente Spark e nei dashboard di monitoraggio.

Per il riferimento completo sul monitoraggio di Structured Streaming, vedere Monitoraggio delle query di Structured Streaming in Azure Databricks.

Creare un dashboard di osservabilità

Combina i dati provenienti da più fonti per creare una dashboard di osservabilità completa per le pipeline di Auto Loader. Questa tabella visualizza alcune origini suggerite che è possibile usare per strutturare il dashboard di osservabilità.

L'origine dei dati Dati di osservabilità
cloud_files_state() Stato di inserimento a livello di file: individuazione, elaborazione, commit e timestamp di archiviazione per ogni file
Registro eventi di Lakeflow Spark Declarative Pipelines Cronologia di esecuzione della pipeline, metriche del flusso per batch e risultati delle aspettative sulla qualità dei dati
Tabelle dei risultati della pipeline Conteggi delle righe e volume di dati scritti per tabella inserita

È quindi possibile aggregare i dati di osservabilità in tabelle dedicate che fungono da base per dashboard e avvisi:

  • Riepilogare gli stati di esecuzione della pipeline (esito positivo o negativo) nel tempo, derivati dagli event_type = 'update_progress' eventi.
  • Metriche aggregate di acquisizione dei file (dimensione del backlog, capacità effettiva, latenza per batch), ricavate dagli eventi cloud_files_state() e event_type = 'flow_progress'.
  • Sviluppare statistiche di tabella usando i conteggi delle righe e il volume di dati per ogni tabella, derivato da num_output_rows nel registro eventi.
  • Raccolta di informazioni di debug dai log dettagliati degli errori e dalle violazioni delle condizioni previste per ogni aggiornamento, derivate da eventi event_type = 'flow_progress' con data_quality valorizzato.

Queste tabelle aggregate possono alimentare un dashboard di intelligenza artificiale/BI e avvisi SQL. I pannelli del dashboard consigliati includono sequenza temporale dello stato di esecuzione della pipeline, tendenza del backlog di inserimento, tendenza della velocità effettiva, distribuzione della latenza di inserimento, metriche di qualità dei dati, eventi di evoluzione dello schema e stato di archiviazione dei file.

Monitorare gli eventi di evoluzione dello schema

Usare gli approcci seguenti per rilevare le modifiche dello schema man mano che si verificano.

  • I valori non NULL in _rescued_data nei conteggi delle violazioni delle aspettative indicano una deriva dello schema. Interrogare il registro eventi per failed_records > 0 in base all'aspettativa no rescued data.
  • Le modifiche alla _schemas directory all'interno della directory configurata cloudFiles.schemaLocation (o all'interno del checkpoint solo quando il percorso dello schema non è impostato separatamente) indicano che si è verificata l'evoluzione dello schema. È possibile monitorare questa directory da un job di monitoraggio separato.
  • Non considerare un evento onQueryTerminated seguito da onQueryStarted per lo stesso nome di stream come prova sufficiente di per sé di un'evoluzione dello schema. I flussi vengono riavviati per molti motivi (riavvii del cluster, distribuzioni del codice, errori di archiviazione temporanei). Correlare i riavvii con segnali indipendenti — _schemas modifiche alla directory o _rescued_data violazioni delle attese — prima di concludere che si sia verificata un'evoluzione dello schema.
  • Usare _metadata.file_path per identificare i file che hanno introdotto le modifiche dello schema. Unisci questo con cloud_files_state() sul campo path per correlare le modifiche dello schema con file e batch specifici.

Usare questa query di esempio per rilevare la deriva recente dello schema tramite violazioni delle aspettative:

SELECT
  timestamp,
  origin.flow_name,
  exp.name AS expectation_name,
  exp.failed_records
FROM (
  SELECT
    timestamp,
    origin,
    explode(from_json(
      details:flow_progress.data_quality.expectations,
      'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
    )) AS exp
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
  AND exp.failed_records > 0
ORDER BY timestamp DESC;

Configurare gli avvisi per i problemi comuni

Usare gli avvisi o le notifiche della pipeline di Databricks SQL per rilevare i problemi prima che influiscano sui consumer downstream.

Il codice SQL seguente rileva un backlog in crescita e può essere usato come base per un avviso SQL di Databricks. Pianificala in modo che venga eseguita periodicamente (ad esempio, ogni 5 minuti) e invia un avviso se il risultato non è vuoto.

-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
  SELECT
    origin.flow_name,
    timestamp,
    DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
    ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
  AND backlog_bytes > 1073741824  -- alert when backlog exceeds 1 GB

La tabella seguente riepiloga le condizioni di avviso consigliate:

Cosa rilevare Come rilevarlo Quando inviare un avviso
Backlog in crescita numFilesOutstanding tendenza verso l'alto Aumento costante in più batch
Flusso bloccato Nessun evento di avanzamento Nessun evento per N minuti (in base all'intervallo di trigger previsto)
Latenza di acquisizione elevata commit_time - create_time Supera la soglia dell'SLA
Riduzione della qualità dei dati Frequenza di errore delle aspettative Percentuale crescente di righe che non soddisfano le aspettative
Evento di evoluzione dello schema _rescued_data IS NOT NULL Qualsiasi valore non NULL nel conteggio delle violazioni previsto
Rilevamento dei file lento durationMs.latestOffset Significativamente superiore alla linea di base

Risolvere i problemi comuni

La tabella seguente descrive i problemi comuni relativi alla pipeline del caricatore automatico, le cause probabili e le azioni consigliate per risolverli.

Issue Possibile causa Azione consigliata
Aumento del backlog più veloce rispetto all'elaborazione Calcolo sottodimensionato, asimmetria dei dati o limiti di velocità limitata Aumentare o ridurre le risorse di calcolo, verificare eventuali squilibri con la UI di Spark ed esaminare le impostazioni maxFilesPerTrigger per controllare la dimensione del batch
File non individuati Eventi di file non configurati correttamente, problemi di autorizzazioni o flusso non eseguiti entro 7 giorni Verifica le autorizzazioni per la posizione esterna, controlla la configurazione degli eventi dei file nell'interfaccia utente di Unity Catalog e assicurati che lo stream venga eseguito almeno ogni 7 giorni per evitare la scadenza dello stato in RocksDB
L'avvio del flusso richiede troppo tempo Download di grandi dimensioni dello stato del checkpoint (RocksDB) Eseguire l'aggiornamento a Databricks Runtime 15.3 e versioni successive per il caricamento dello stato asincrono, riducendo il tempo di avvio di circa 90%
Elaborazione di file duplicati Impostazioni aggressive cloudFiles.maxFileAge o danneggiamento del checkpoint Usare un criterio conservativo maxFileAge (almeno 90 giorni), verificare l'integrità del checkpoint ed evitare i criteri relativi al ciclo di vita nell'archiviazione dei checkpoint
Evoluzione dello schema che provoca il riavvio della pipeline Modifiche frequenti o incompatibili dello schema Esaminare schemaEvolutionMode, passare a addNewColumnsWithTypeWidening per promozioni di tipo o usare il tipo Variant per schemi altamente dinamici
Dati danneggiati accumulati nel sink Problemi di qualità dei dati di origine Controllare il sink _corrupt_record di quarantena per individuare eventuali pattern, esaminare la modalità di generazione dei dati di origine e prendere in considerazione l'aggiunta della convalida a monte
discovery_time e commit_time non popolati In esecuzione su Databricks Runtime versione precedente alla 18.2 senza cleanSource Eseguire l'aggiornamento a Databricks Runtime 18.2 e versioni successive o abilitare cloudFiles.cleanSource in Databricks Runtime dalla versione 16.4 alla 18.1

Per altre informazioni sulla risoluzione dei problemi, vedere Domande frequenti sul caricatore automatico.