Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Der Change Data Feed (CDF) erfasst Änderungen auf Zeilenebene zwischen Versionen einer Delta-Lake-Tabelle oder einer Apache-Iceberg-v3-Tabelle.
Azure Databricks unterstützt zwei Ansätze:
- Automatischer Änderungsdatenfeed: Berechnet Änderungen beim Lesen von Tabellen mithilfe von Metadaten zur Zeilenherkunft. Dies erfordert keine individuelle Tabellenkonfiguration und funktioniert auf Delta Lake- und Apache Iceberg v3-Tabellen. Siehe "Datenfeed für automatische Änderung".
- Legacy-Änderungsdatenfeed: Materialisiert Änderungen während Tabellenschreibvorgängen. Unterstützt nur Delta Lake-Tabellen. Erfordert eine individuelle Tabellenkonfiguration. Siehe Legacy-Änderungsdatenfeed für Delta Lake.
Sie können den Änderungsdatenfeed für allgemeine Datennutzungsfälle verwenden, einschließlich:
- Inkrementelle ETL-Pipelines, die nur die Datensätze verarbeiten, die seit der letzten Ausführung der Pipeline geändert wurden.
- Audit-Trails, die Datenänderungen zur Erfüllung von Compliance- und Governance-Anforderungen nachverfolgen.
- Datenreplikations-Workloads, die Änderungen mit nachgelagerten Tabellen, Caches oder externen Systemen synchronisieren.
Datenfeed für automatische Änderungen
Wichtig
Dieses Feature befindet sich in der Public Preview. Arbeitsbereichsadministratoren können den Zugriff auf dieses Feature über die Vorschauseite steuern. Siehe Manage Azure Databricks Previews.
Der Automatische Änderungsdatenfeed berechnet Änderungen auf Zeilenebene zur Abfragezeit anstelle zur Schreibzeit mithilfe der Zeilennachverfolgung für Delta Lake und Zeilenlinie für Apache Iceberg v3. Im Gegensatz zu Legacy-Änderungsdatenfeeds erfordert der Automatische Änderungsdatenfeed keine konfiguration einzelner Tabellen und funktioniert in Delta Lake-Tabellen und Apache Iceberg v3-Tabellen.
Da Änderungen bei Schreibvorgängen für MERGE INTO- und UPDATE-Operationen nicht bei jedem Schreibvorgang berechnet werden, verbessert der automatische Änderungsdatenfeed im Vergleich zum alten Änderungsdatenfeed die Schreibleistung und senkt die Speicherkosten.
Automatischer Änderungsdatenfeed verwendet dieselben table_changes() und readChangeFeed APIs wie der ältere Änderungsdatenfeed und funktioniert mit Batchabfragen, Structured Streaming und Databricks-to-Databricks Delta Lake Sharing. Siehe Änderungen in Batchabfragen lesen und Änderungsdaten inkrementell verarbeiten.
Anforderungen
- Databricks Runtime 18 oder höher
- Ein unterstütztes Tabellenformat, das im Unity-Katalog registriert ist:
- Eine verwaltete Tabelle im Delta Lake-Format mit aktivierter Zeilenverfolgung oder im Iceberg v3-Format.
- Eine externe Tabelle im Delta Lake-Format mit aktivierter Zeilenverfolgung.
Siehe Databricks Unity Catalog-Tabellentypen.
Note
Der Änderungsdatenfeed ist nicht Teil der Apache-Iceberg-Spezifikation. Azure-Databricks-Reader können den automatischen Änderungsdatenfeed für Apache-Iceberg-v3-Tabellen abfragen, externe Iceberg-Reader jedoch nicht. Sehen Sie sich die Eisberg-Tabellenspezifikation an.
Bei Delta Lake können nur Azure Databricks Leser automatische Änderungsdatenfeed abfragen.
Änderungsdatenfeed verwenden
Um den Änderungsdatenfeed zu verwenden, vergewissern Sie sich, dass Sie eine Tabelle verwenden, die die Anforderungen erfüllt. Siehe Anforderungen.
Gehen Sie wie folgt vor, um den Änderungsdatenfeed stapelweise zu lesen:
Python
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("<table_name>")
Scala
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<table_name>")
SQL
SELECT * FROM table_changes('<table_name>', 0)
Weitere Informationen zu Batch-Lesevorgängen für den Change Data Feed finden Sie unter Änderungen in Batchabfragen lesen.
Gehen Sie wie folgt vor, um den Änderungsdatenfeed als Stream zu lesen:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("<table_name>")
Weitere Informationen zum Streamen von Lesevorgängen für Änderungsdatenfeeds finden Sie unter Inkrementelle Verarbeitung von Änderungsdaten.
Migration vom Legacy-Änderungsdatenfeed
Gehen Sie wie folgt vor, um eine Delta Lake-Tabelle von einem veralteten Änderungsdatenfeed zu einem automatischen Änderungsdatenfeed zu migrieren:
- Stellen Sie sicher, dass Ihre Tabelle die Anforderungen erfüllt.
- Deaktivieren Sie den Legacy-Änderungsdatenfeed, indem Sie den folgenden Befehl ausführen:
ALTER TABLE <table_name> UNSET TBLPROPERTIES ('delta.enableChangeDataFeed');
Sie können sowohl legacy- als auch automatische Änderungsdatenfeeds nicht zusammen verwenden.
Ändern des Datenfeedschemas
Wenn Sie aus dem Änderungsdatenfeed für eine Tabelle lesen, verwendet die Abfrage das Schema für die neueste Tabellenversion. Azure Databricks unterstützt die meisten Schemaänderungs- und Evolutionsvorgänge, tabellen mit Spaltenzuordnung weisen jedoch Einschränkungen auf. Siehe Tabellen mit Spaltenzuordnung.
Zusätzlich zu den Datenspalten aus dem Schema der Delta Lake-Tabelle enthält der Datenfeed Metadatenspalten, die den Typ des Änderungsereignisses identifizieren:
| Spaltenname | Typ | Werte |
|---|---|---|
_change_type |
String | Enthält: insert, , update_preimageupdate_postimage, delete.preimage ist der Wert vor der Aktualisierung, postimage ist der Wert nach der Aktualisierung. |
_commit_version |
Long | Enthält: die Delta-Protokoll- oder Tabellenversion, die die Änderung enthält. |
_commit_timestamp |
Timestamp | Enthält: den Zeitstempel, der dem Erstellungszeitpunkt des Commits zugeordnet ist. |
Wenn das Schema Spalten mit denselben Namen wie diese Metadatenspalten enthält, können Sie den Datenfeed nicht für eine Tabelle verwenden. Bevor Sie den Änderungsdatenfeed aktivieren, benennen Sie die Spalten in Ihrer Tabelle um, um diesen Konflikt zu beheben.
Inkrementelle Verarbeitung von Änderungsdaten
Databricks empfiehlt die Verwendung von Änderungsdatenfeeds in Kombination mit strukturiertem Streaming, um Änderungen aus Tabellen inkrementell zu verarbeiten. Sie müssen strukturiertes Streaming für Azure Databricks verwenden, um Versionen für den Änderungsdatenfeed Ihrer Tabelle automatisch nachzuverfolgen. Informationen zur CDC-Verarbeitung mit SCD-Typ 1- oder Typ 2-Tabellen finden Sie in den AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.
Wenn der Stream erstmals startet, gibt der Change Data Feed den neuesten Snapshot der Tabelle in Form von INSERT Datensätzen zurück und anschließend künftige Änderungen als Änderungsdaten. Change Data Feeds schreiben sowohl Änderungsdaten als auch neue Datenzeilen gleichzeitig in das Transaktionsprotokoll der Tabelle.
Um einen Datenstrom zum Lesen des Änderungsdatenfeeds einer Tabelle zu konfigurieren, legen Sie die Option readChangeFeedtrue wie folgt fest:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("myTable")
Ratenbeschränkungen
Azure Databricks unterstützt Ratelimits (maxFilesPerTrigger, maxBytesPerTrigger) und excludeRegex beim Lesen von Änderungsdaten. Eine vollständige Liste der Streaming-Delta Lake-Optionen finden Sie unter Delta Lake.
Optional können Sie eine Startversion angeben, siehe Angeben einer Startversion. Für Versionen außer dem Ausgangs-Snapshot werden Ratenbegrenzungen atomisch auf gesamte Commits angewendet. Entweder enthält der aktuelle Batch den gesamten Commit, oder der aktuelle Batch verschiebt den Commit in den nächsten Batch.
Verlauf der Wiedergabetabelle
Ein Änderungsdatenfeed dient nicht als permanenter Datensatz aller Änderungen an einer Tabelle. Es zeichnet nur Änderungen auf, die auftreten, nachdem der Change Data Feed aktiviert wurde. Sie können einen neuen Streaming-Lesevorgang starten, um die aktuelle Version und alle nachfolgenden Änderungen zu erfassen.
Datensätze im Änderungsdatenfeed sind vorübergehend und nur für ein bestimmtes Aufbewahrungsfenster zugänglich. Transaktionsprotokolle entfernen Tabellenversionen und die entsprechenden Versionen des Änderungsdatenfeeds in regelmäßigen Abständen. Wenn eine Version entfernt wird, können Sie den Änderungsdatenfeed für diese Version nicht mehr lesen.
Änderungsdaten für eine dauerhafte Historie archivieren
Wenn Ihr Anwendungsfall erfordert, dass Sie einen dauerhaften Verlauf aller Änderungen an einer Tabelle beibehalten, verwenden Sie inkrementelle Logik, um Datensätze aus dem Änderungsdatenfeed in eine neue Tabelle zu schreiben.
Das folgende Beispiel zeigt, wie trigger.AvailableNow verwendet wird, um verfügbare Daten als Batch-Arbeitslast für die Auditierung oder die vollständige Wiederholung aller Änderungen zu verarbeiten:
Python
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Angeben einer Startversion
Wenn Sie Änderungen von einem bestimmten Punkt lesen möchten, geben Sie eine Startversion entweder mit einem Zeitstempel oder einer Versionsnummer an. Startversionen sind für Batchlesevorgänge erforderlich. Optional können Sie eine Endversion angeben, um den Bereich einzuschränken. Weitere Informationen zum Tabellenverlauf finden Sie unter Was ist Zeitreise?.
Wenn Sie strukturierte Streaming-Workloads konfigurieren, die Änderungsdatenfeed verwenden, kann sich die Angabe einer Startversion auf die Verarbeitungsleistung auswirken:
- Neue Datenverarbeitungspipelines profitieren in der Regel vom Standardverhalten, das alle vorhandenen Datensätze in der Tabelle als
INSERTVorgänge beim ersten Start des Datenstroms aufzeichnet. - Wenn Ihre Zieltabelle bereits alle Datensätze mit entsprechenden Änderungen bis zu einem bestimmten Punkt enthält, geben Sie eine Startversion an, um die Verarbeitung des Quelltabellenzustands als
INSERT-Ereignisse zu vermeiden.
Das folgende Beispiel zeigt, wie Sie einen Streamingfehler mit einem beschädigten Prüfpunkt wiederherstellen. Gehen Sie in diesem Beispiel von den folgenden Bedingungen aus:
- Der Datenfeed wurde für die Quelltabelle bei der Tabellenerstellung aktiviert.
- Die Ziel-Downstream-Tabelle verarbeitete alle Änderungen bis einschließlich Version 75.
- Der Versionsverlauf für die Quelltabelle ist für die Versionen 70 und höher verfügbar.
Wenn Sie den Schreibdatenstrom für die vorhandene Zieltabelle definieren, müssen Sie einen neuen Checkpoint-Speicherort angeben:
Python
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
)
Scala
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
.writeStream
.option("checkpointLocation", "<new-checkpoint-path>")
.toTable("target_table")
Wichtig
Wenn Sie eine Startversion angeben und diese Version im Tabellenverlauf nicht verfügbar ist, kann der Datenstrom nicht mit einem neuen Prüfpunkt beginnen. Da verwaltete Tabellen historische Versionen automatisch bereinigen, werden alle angegebenen Startversionen schließlich gelöscht.
Siehe Verlauf der Wiederholungstabelle.
Lesen von Änderungen in Batchabfragen
Sie können die Batchabfragesyntax verwenden, um alle Änderungen ab einer bestimmten Version zu lesen oder Änderungen innerhalb eines bestimmten Versionsbereichs wie folgt zu lesen:
- Geben Sie Versionen als ganze Zahlen und Zeitstempel als Zeichenfolgen im Format
yyyy-MM-dd[ HH:mm:ss[.SSS]]an. - Die Start- und Endversionen sind inbegriffen. Wenn Sie von einer Startversion bis zur neuesten Version lesen möchten, geben Sie nur die Startversion an.
- Wenn Sie eine Version angeben, bevor der Datenfeed geändert wurde, wird ein Fehler ausgelöst.
Gehen Sie wie folgt vor, um Batchlesevorgänge mit Start- und Endversionsoptionen zu verwenden:
SQL
Um von Version 0 bis 10 zu lesen, gehen Sie wie folgt vor:
SELECT * FROM table_changes('tableName', 0, 10)
Gehen Sie wie folgt vor, um zwischen zwei Zeitstempelversionen zu lesen:
--
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
Gehen Sie wie folgt vor, um von einer Startversion auf die neueste Version zu lesen:
SELECT * FROM table_changes('tableName', 0)
Gehen Sie wie folgt vor, um Änderungen für eine Tabelle mit Sonderzeichen im Namen zu lesen:
SELECT * FROM table_changes('`schema`.`dotted.tableName`', '2021-04-21 06:45:46', '2021-05-21 12:00:00')
Siehe table_changes Tabellenwertfunktion.
Python
Um von Version 0 bis 10 zu lesen, gehen Sie wie folgt vor:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
Gehen Sie wie folgt vor, um zwischen zwei Zeitstempeln zu lesen:
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
Gehen Sie wie folgt vor, um von einer Startversion auf die neueste Version zu lesen:
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala
Gehen Sie wie folgt vor, um von Version 0 bis 10 zu lesen:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
Gehen Sie wie folgt vor, um zwischen zwei Zeitstempeln zu lesen:
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
Gehen Sie wie folgt vor, um von einer Startversion auf die neueste Version zu lesen:
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Behandlung von Versionen außerhalb des zulässigen Bereichs
Wenn Sie standardmäßig eine Version oder einen Zeitstempel angeben, der den letzten Commit überschreitet, gibt die Abfrage den Fehler zurück timestampGreaterThanLatestCommit.
In Databricks Runtime 11.3 LTS und höher können Sie die Toleranz für Out-of-Range-Versionen wie folgt aktivieren:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Wenn diese Konfiguration aktiviert ist, gibt die Abfrage unterschiedliche Ergebnisse wie folgt zurück:
- Eine Startversion oder ein Zeitstempel nach dem letzten Commit gibt ein leeres Ergebnis zurück.
- Eine endende Version oder ein Zeitstempel über den letzten Commit hinaus gibt alle Änderungen vom Anfang bis zum letzten Commit zurück.
Legacy-Änderungsdatenfeed für Delta Lake
Der Legacy-Change-Data-Feed muss für einzelne Delta Lake-Tabellen manuell konfiguriert werden. Da der Änderungsdatenfeed nicht in der Spezifikation von Apache Iceberg enthalten ist, werden Apache Iceberg-Tabellen nicht unterstützt. Databricks empfiehlt, zum automatischen Änderungsdatenfeed zu migrieren. Siehe Migration vom Legacy-Änderungsdatenfeed.
Wenn der Legacy-Change-Data-Feed aktiviert ist, erfasst die Laufzeit Änderungsereignisse für sämtliche in die Tabelle geschriebenen Daten. Dazu gehören die Zeilendaten sowie Metadaten, die angeben, ob die angegebene Zeile eingefügt, gelöscht oder aktualisiert wurde.
Der Legacy-Change-Data-Feed verwendet dieselben readChangeFeed und table_changes() Lese-APIs wie der automatische Change-Data-Feed. Siehe inkrementelle Verarbeitung von Änderungsdaten und Lesen von Änderungen in Batchabfragen.
Veralteten Änderungsdatenfeed aktivieren
Sie müssen den Legacy-Change-Data-Feed für einzelne Tabellen ausdrücklich aktivieren. Verwenden Sie eine der folgenden Methoden:
Neue Tabelle
Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im CREATE TABLE Befehl fest.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Note
Wenn Sie den Legacyänderungsdatenfeed für ein beliebiges Zeitintervall deaktivieren und dann erneut aktivieren, kann das Intervall nicht abgefragt werden. Verwenden Sie den Datenfeed für automatische Änderungen, um Änderungen während des Intervalls abzufragen. Siehe "Datenfeed für automatische Änderung".
Vorhandene Tabelle
Legen Sie die Tabelleneigenschaft delta.enableChangeDataFeed = true im ALTER TABLE Befehl fest.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Überlegungen zur Speicherung
Verwaltete Tabellen zeichnen Datenänderungen effizient auf und verwenden möglicherweise andere Features zum Optimieren des Speicherlayouts.
Beim älteren Änderungsdatenfeed müssen Sie folgendes Speicherverhalten berücksichtigen:
- Möglicherweise sehen Sie eine geringe Erhöhung der Speicherkosten, da Änderungen möglicherweise in separaten Dateien aufgezeichnet werden.
- Einige Vorgänge, wie reine Einfügevorgänge oder Löschungen ganzer Partitionen, erzeugen keine Änderungsdaten-Dateien. Azure Databricks berechnet den Änderungsdatenfeed direkt aus dem Transaktionsprotokoll.
- Änderungsdatendateien verwenden die Aufbewahrungsrichtlinie der Tabelle. Mit dem Befehl
VACUUMwerden Dateien mit Änderungsdaten gelöscht, und für Änderungen aus dem Transaktionsprotokoll gilt die Checkpoint-Aufbewahrungsrichtlinie.
Databricks empfiehlt, den Änderungsdatenfeed nicht zu rekonstruieren, indem Sie Änderungsdatendateien direkt abfragen. Verwenden Sie immer Delta Lake- und Apache Iceberg-APIs.
Einschränkungen
Beachten Sie die folgenden Einschränkungen für Änderungsdatenfeeds:
Tabellen mit Spaltenzuordnung
Wenn die Spaltenzuordnung in einer Delta Lake-Tabelle aktiviert ist, können Sie Spalten ablegen oder umbenennen, ohne Datendateien neu zu schreiben. Siehe Umbenennen und Löschen von Spalten mit der Delta Lake-Spaltenzuordnung.
Änderungsdatenfeeds weisen jedoch nach nicht-additiven Schemaänderungen Einschränkungen auf. Nicht-additive Schemaänderungen umfassen die folgenden Vorgänge:
- Spalten umbenennen oder ablegen.
- Ändern von Spaltendatentypen.
- Nullfähigkeit einer Spalte ändern, z. B. mit
ALTER COLUMN ... SET NOT NULL. Siehe Festlegen einerNOT NULLEinschränkung in Azure Databricks.
Sie können keine Änderungsdatenfeeds für eine Transaktion oder einen Bereich lesen, in denen eine nicht additive Schemaänderung auftritt.
Um änderungen nicht additiver Schemas vor oder nach dem angegebenen Batchlesebereich zuzulassen, verwenden Abfragen das Schema der Endversion des Bereichs und nicht die neueste Tabellenversion. Abfragen schlagen weiterhin fehl, wenn sich der Versionsbereich über eine nicht additive Schemaänderung erstreckt.
Datenfeed für automatische Änderungen
- Da der Änderungsdatenfeed in der Apache Iceberg-Spezifikation nicht unterstützt wird, können externe Iceberg-Clients den automatischen Änderungsdatenfeed nicht abfragen. Sehen Sie sich die Eisberg-Tabellenspezifikation an.
- Bei Transaktionen mit mehreren Anweisungen wird der automatische Change Data Feed nicht unterstützt, wenn die Quelltabelle während der Transaktion geändert wurde.
- Automatischer Änderungsdatenfeed wird für Tabellen mit Zeilenfiltern oder Spaltenmasken nicht unterstützt. Siehe Zeilenfilter und Spaltenmasken.
- Abfragen des Change Data Feed können sich nicht über Tabellenversionen erstrecken, bei denen eine nicht additive Schemaänderung aufgetreten ist, etwa das Umbenennen oder Löschen einer Spalte oder eine Änderung des Datentyps. Teilen Sie die Abfrage vor und nach der Schemaänderung in Bereiche auf.