Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina descrive i concetti del watermarking e contiene raccomandazioni sull'uso dei watermark nelle comuni operazioni di streaming con stato.
Le query di streaming accumulano dati sullo stato nel tempo. Le filigrane rimuovono automaticamente i dati di stato precedenti per evitare errori di memoria e una maggiore latenza di elaborazione.
Che cos'è una filigrana?
Durante l'elaborazione, Structured Streaming mantiene lo stato tra micro batch. Le query di streaming usano lo stato per aggiornare in modo incrementale i risultati anziché ricompilare tutti gli elementi dopo ogni micro batch. I watermark controllano la soglia alla quale una query interrompe l'elaborazione di un'entità di stato.
Esempi comuni di entità di stato includono:
- Aggregazioni in un intervallo di tempo.
- Chiavi univoce in un collegamento tra due flussi.
Per dichiarare un watermark su un DataFrame in streaming, specificare un campo timestamp e una soglia di ritardo. Man mano che arrivano nuovi dati, il gestore di stato tiene traccia del timestamp più recente nel campo specificato ed elabora solo i record entro la soglia di ritardo.
Le query elaborano sempre i record che arrivano entro la soglia. Le query potrebbero comunque elaborare record che arrivano oltre la soglia, ma ciò non è garantito.
Nell'esempio seguente viene applicata una soglia limite di 10 minuti a un conteggio finestrato:
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
In questo esempio:
- La colonna
event_timeviene usata per definire una filigrana di 10 minuti e una finestra a scorrimento fisso di 5 minuti. - Viene raccolto un conteggio per ogni
idosservato per ogni finestra di 5 minuti non sovrapposta. - Le informazioni sullo stato vengono mantenute per ciascun conteggio finché la fine della finestra non è più vecchia di 10 minuti rispetto all'ultimo
event_timeosservato.
Importante
In un'operazione groupBy() e window(), fare riferimento alle colonne per nome, "<colName>" o col("<colName>"), per assicurarsi che il marcatore temporale dell'evento venga mantenuto. In Scala è anche possibile usare $colName.
In che modo le filigrane influiscono sul tempo di elaborazione e sulla velocità effettiva?
Le modalità di output determinano quando una query con watermark scrive i dati nel sink. I watermark sono essenziali per il controllo della velocità di elaborazione nello streaming stateful perché riducono la quantità totale di informazioni di stato in memoria. Non tutte le modalità di output sono supportate per tutte le operazioni con stato. Vedi Watermark e modalità di output per le aggregazioni con finestra.
La selezione di una durata della filigrana presenta compromessi:
- Watermark più brevi riducono la latenza delle query perché le query archiviano meno informazioni di stato e scrivono i risultati al termine di ciascun intervallo di watermark. Tuttavia, i watermark brevi tollerano poco i dati in ritardo.
- Le filigrane più lunghe hanno una tolleranza elevata per i dati in ritardo. Tuttavia, le filigrane lunghe aumentano la latenza delle query perché le query devono archiviare più informazioni sullo stato e attendere la scrittura dei risultati dopo una durata più lunga della filigrana.
Filigrane e modalità di output per le aggregazioni basate su finestre
La tabella seguente illustra il comportamento di elaborazione per le query con aggregazione su un timestamp e una filigrana:
| Modalità output | Comportamento |
|---|---|
| Accodare | La query scrive righe nella tabella di destinazione dopo il superamento della soglia limite. Tutte le scritture vengono ritardate in base alla soglia di ritardo. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia. |
| Aggiornare | La query scrive righe nella tabella di destinazione quando vengono calcolati i risultati e la query può aggiornare e sovrascrivere le righe man mano che arrivano nuovi dati. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia. |
| Completo | Lo stato dell'aggregazione non viene eliminato. La query riscrive la tabella di destinazione per ogni trigger. |
Watermark e le modalità di output per i join stream-stream
I join tra più flussi supportano solo la modalità di accodamento. Le query registrano i record corrispondenti per ogni batch.
Per i join interni, Databricks consiglia di impostare una soglia di watermark per ogni origine dati in streaming, in modo da consentire alla query di scartare le informazioni di stato relative ai record meno recenti. Senza watermark, Structured Streaming tenta di unire ogni chiave su entrambi i lati del join a ogni trigger, operazione che potrebbe influire sulle prestazioni.
Per i join esterni, il watermark è obbligatorio. Quando un record non corrisponde, la query scrive un valore Null per tale chiave. Poiché i join consentono solo la modalità append, i record non corrispondenti non vengono scritti finché non viene raggiunta la soglia di ritardo.
Controllo della soglia dei dati in ritardo con un criterio basato su più watermark
Per più input di Structured Streaming, è possibile impostare più watermark per controllare le soglie di tolleranza per i dati in arrivo in ritardo. Le filigrane consentono di controllare le informazioni sullo stato e la latenza.
Una query di streaming può avere più flussi di input uniti o collegati. Per le operazioni con stato, ognuno dei flussi di input potrebbe richiedere una soglia diversa per la tolleranza ai dati tardiva. Specificare queste soglie usando withWatermark("eventTime", delay) in ogni flusso di input. Di seguito è riportata una query di esempio con join tra flussi.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Durante l'esecuzione della query con operazioni con stato, Structured Streaming tiene traccia singolarmente dell'ora dell'evento massima per ogni flusso di input, calcola i watermark in base al ritardo associato e determina un unico watermark globale. Per impostazione predefinita, Structured Streaming usa il valore minimo come filigrana globale. Se un flusso rimane indietro rispetto agli altri, un watermark globale minimo impedisce alla query di segnalare erroneamente i dati come in ritardo. Ad esempio, ciò può verificarsi quando uno dei flussi smette di ricevere dati a causa di errori upstream. Il watermark globale avanza in sicurezza alla velocità del flusso più lento e, quando necessario, posticipa l'output della query.
Per ridurre la latenza, impostare spark.sql.streaming.multipleWatermarkPolicy su max (il valore predefinito è min) per utilizzare il watermark del flusso più veloce come watermark globale. Tuttavia, questa configurazione elimina i dati dai flussi più lenti. Databricks consiglia di applicare questa configurazione con cautela.
Applicare filigrane a operazioni distinte
L'operazione distinct tiene traccia di ogni record univoco nello stato. Senza un watermark, lo stato cresce indefinitamente e può causare problemi di memoria. Specificare un watermark su un campo timestamp per delimitare lo stato e rimuovere i record vecchi dopo il superamento della soglia.
L'esempio seguente applica una filigrana a un'operazione distinct :
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
In questo esempio, la query di streaming rimuove i record duplicati che arrivano entro 1 ora dall'ultima osservata eventTime. La query scarta le informazioni di stato per la deduplicazione una volta superata la soglia.
Importante
Per deduplicare colonne specifiche anziché tutte le colonne, usare dropDuplicates() o dropDuplicatesWithinWatermark() anziché distinct. Vedere Rimuovi i duplicati all'interno della filigrana.
Eliminare i duplicati all'interno della filigrana
In Databricks Runtime 13.3 LTS o versione successiva è possibile usare un identificatore univoco per deduplicare i record all'interno di una soglia limite.
Structured Streaming garantisce l'elaborazione una e una sola volta, ma non elimina i duplicati dei record provenienti dalle origini dati. Usare dropDuplicatesWithinWatermark per rimuovere i duplicati in qualsiasi campo, anche quando i campi differiscono tra record duplicati, ad esempio l'ora dell'evento o l'ora di arrivo.
Con dropDuplicatesWithinWatermark, le query eliminano sempre i duplicati tra i record che arrivano entro la soglia del watermark. Le query potrebbero anche eliminare i duplicati tra i record che arrivano oltre la soglia, ma ciò non è garantito. Per garantire che le query scartino tutti i duplicati, impostare la soglia del watermark su un valore superiore alla differenza massima tra i timestamp degli eventi duplicati.
È necessario specificare una filigrana per usare il dropDuplicatesWithinWatermark metodo :
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Esempi di casi d'uso
Gli esempi seguenti illustrano casi d'uso avanzati di windowing:
Usare finestre a cascata per calcolare i totali delle vendite orarie
Le finestre a cascata sono a dimensione fissa con intervalli non sovrapposti. Ogni riga di input appartiene esattamente a una finestra. Usare finestre a cascata per calcolare aggregazioni discrete del periodo di tempo, ad esempio i totali delle vendite orarie:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
In questo esempio:
-
window("timestamp", "1 hour")raggruppa gli ordini in intervalli di 1 ora non sovrapposti, ad esempio da 5 a 6:00 e da 6 a 7:00. -
withWatermark("timestamp", "1 hour")mantiene l'aggregazione di ogni finestra nello stato fino a quando il timestamp di fine della finestra non è di 1 ora superiore al timestamp massimo dell'ordine.
Usare finestre scorrevoli per calcolare aggregazioni mobili
Le finestre scorrevoli sono a dimensione fissa con intervalli che possono sovrapporsi. Una singola riga può appartenere a più finestre. Usare finestre scorrevoli per calcolare aggregazioni in sequenza, ad esempio le vendite in un periodo di 6 ore in sequenza:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
In questo esempio:
-
window("timestamp", "6 hours", slideDuration="1 hour")raggruppa gli ordini in intervalli di 6 ore che avanzano di un'ora, ad esempio dalle 5 alle 11 e dalle 6 alle 12.00. -
withWatermark("timestamp", "1 hour")mantiene l'aggregato di ogni finestra nello stato fino a quando il timestamp di fine della finestra non è precedente di 1 ora rispetto al timestamp massimo degli ordini. -
slideDurationdeve essere minore o uguale awindowDuration.
Usare le finestre di sessione per controllare l'attività dell'utente
Le finestre di sessione non hanno dimensioni fisse. Una finestra si apre quando arriva una riga e si chiude dopo un intervallo di inattività durante il quale non arrivano nuove righe. Usare le finestre di sessione per aggregare i picchi di attività tra periodi di inattività lunghi, ad esempio le visualizzazioni di pagina di un utente entro un periodo di 30 minuti:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
In questo esempio:
-
session_window("timestamp", gapDuration="30 minutes")apre una finestra all'arrivo della prima visualizzazione pagina. Ogni visualizzazione pagina successiva che arriva entro 30 minuti estende la finestra. Quando non arriva alcuna visualizzazione pagina entro 30 minuti, la finestra si chiude e la visualizzazione pagina successiva avvia una nuova finestra. -
withWatermark("timestamp", "1 hour")mantiene l'aggregato di ogni sessione nello stato finché il timestamp di fine finestra non è precedente di 1 ora al timestamp massimo delle visualizzazioni di pagina. - L'argomento
timeColumnperwindow()esession_window()deve essere diTimestampTypeoTimestampNTZType. - Usare
current_timestamp()per definire le finestre in base al tempo di elaborazione anziché all'ora dell'evento. - È possibile impostare durate di finestra da microsecondi fino a giorni. Le durate pari o superiori a un mese non sono supportate.
- Usa la
completemodalità di output con aggregazioni su finestra per mantenere indefinitamente tutto lo stato delle finestre. Usare la modalità di outputappendcon un watermark appropriato per limitare la crescita dello stato ed evitare problemi di memoria con set di dati di grandi dimensioni. Per maggiori dettagli sul comportamento della modalità di output, consulta Watermark e modalità di output per le aggregazioni con finestra.