Referenz zu Spark-API-Optionen

Auf dieser Seite sind die verfügbaren Eingabe- und Ausgabeoptionen für Spark-APIs aufgeführt, die Daten lesen und schreiben.

DataFrameReader-Optionen

Verwenden Sie diese Optionen mit DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO und Auto Loader, um zu steuern, wie Azure Databricks Datendateien liest.

Example

Im folgenden Beispiel wird festgelegt multiLine , dass True JSON-Dateien gelesen werden:

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

Gemeinsam

Die folgenden Optionen gelten für alle Dateiformate.

Schlüssel Vorgabe Gültige Werte Description
ignoreCorruptFiles false true, false Gibt an, ob beschädigte Dateien ignoriert werden sollen. Bei TRUE werden die Spark-Aufträge weiterhin ausgeführt, wenn beschädigte Dateien festgestellt werden, und gelesene Inhalte werden weiterhin zurückgegeben. Für COPY INTO, können Sie übersprungene Dateien wie numSkippedCorruptFiles in der operationMetrics Spalte der Delta Lake Geschichte beobachten. Verfügbar in Databricks Runtime 11.3 LTS und höher.
ignoreMissingFiles false für das automatische Laden true für COPY INTO (Legacy) true, false Gibt an, ob fehlende Dateien ignoriert werden sollen. Wenn true, werden die Spark-Aufträge weiterhin ausgeführt, wenn fehlende Dateien auftreten und die Inhalte weiterhin zurückgegeben werden. Verfügbar in Databricks Runtime 11.3 LTS und höher.
modifiedAfter None Eine Zeitstempelzeichenfolge Ein optionaler Zeitstempel als Filter, um nur Dateien aufzunehmen, die nach dem angegebenen Zeitstempel einen Änderungszeitstempel aufweisen.
modifiedBefore None Eine Zeitstempelzeichenfolge Ein optionaler Zeitstempel als Filter, um nur Dateien aufzunehmen, die einen Änderungszeitstempel vor dem angegebenen Zeitstempel aufweisen.
pathGlobFilter oder fileNamePattern None Eine Glob-Musterzeichenfolge Ein potenzielles Globmuster für die Auswahl von Dateien. PATTERN Entspricht in COPY INTO (Vorversion). fileNamePattern kann in read_files verwendet werden.
recursiveFileLookup false true, false Wenn truediese Option geschachtelte Verzeichnisse durchsucht, auch wenn deren Namen keinem Partitionsbenennungsschema folgen, z date=2019-07-01. B. .

Avro

Die folgenden Optionen gelten beim Lesen von Avro-Dateien.

Schlüssel Vorgabe Gültige Werte Description
avroSchema None Eine Avro-Schemazeichenfolge Optionales Schema, das von einem Benutzer im Avro-Format angegeben wird. Beim Lesen von Avro kann diese Option auf ein weiterentwickeltes Schema festgelegt werden, das kompatibel ist, aber sich von dem tatsächlichen Avro-Schema unterscheidet. Das Deserialisierungsschema entspricht dem weiterentwickelten Schema. Wenn Sie beispielsweise ein weiterentwickeltes Schema mit einer zusätzlichen Spalte mit einem Standardwert festlegen, enthält das Leseergebnis auch die neue Spalte.
avroSchemaEvolutionMode none none, restart Behandeln der Schemaentwicklung bei Verwendung einer Schemaregistrierung none ignoriert Schemaänderungen und setzt den Auftrag fort. restart löst ein UnknownFieldException , wenn Schemaänderungen erkannt werden und ein Auftragsneustart erforderlich ist.
datetimeRebaseMode LEGACY EXCEPTION, LEGACYCORRECTED Steuert, ob DATE- und TIMESTAMP-Werte auf dem gregorianischen Kalender und dem proleptischen gregorianischen Kalender basieren sollen.
enableStableIdentifiersForUnionType false true, false Gibt an, ob stabile Feldnamen für Avro Union-Typen verwendet werden sollen. Wenn diese Option aktiviert ist, werden Die Namen von Union-Typfeldern aus ihren Typnamen in Kleinbuchstaben abgeleitet (z member_int. B. , member_string). Löst eine Ausnahme aus, wenn zwei Typnamen nach der Unterschreibweise identisch sind.
mergeSchema false true, false Gibt an, ob das Schema über mehrere Dateien hinweg abgeleitet und das Schema der einzelnen Dateien zusammengeführt werden soll. mergeSchema für Avro bewirkt keine Lockerung von Datentypen.
mode FAILFAST FAILFAST, PERMISSIVEDROPMALFORMED Parsermodus für die Behandlung beschädigter Datensätze. FAILFAST löst eine Ausnahme aus. PERMISSIVE legt falsch formatierte Felder auf NULL fest. DROPMALFORMED Löscht im Hintergrund schlechte Datensätze.
readerCaseSensitive true true, false Diese Option gibt das Verhalten bei Groß- und Kleinschreibung an, wenn rescuedDataColumn aktiviert ist. Wenn true, retten Sie die Datenspalten, deren Namen sich vom Schema unterscheiden. Wenn "false" lautet, lesen Sie die Daten ohne Groß-/Kleinschreibung.
recursiveFieldMaxDepth None 0 bis 15 Die maximale Rekursionstiefe für rekursive Avro-Felder. Legen Sie fest 1 , dass alle rekursiven Felder abgeschnitten werden sollen, 2 um eine Rekursionsebene usw. zuzulassen 15. Wenn nicht festgelegte oder 0rekursive Felder nicht zulässig sind.
rescuedDataColumn None Eine Spaltennamenzeichenfolge Gibt an, ob alle Daten, die aufgrund eines Datentypkonflikts oder eines Schemakonflikts (einschließlich der Schreibweise von Spaltennamen) nicht geparst werden können, in einer separaten Spalte gesammelt werden sollen. Diese Spalte ist bei Verwendung des Autoloaders standardmäßig enthalten.
COPY INTO (Legacy) unterstützt die gerettete Datenspalte nicht, da Sie das Schema nicht manuell mit COPY INTOfestlegen können. Databricks empfiehlt die Verwendung des automatischen Ladens für die meisten Aufnahmeszenarien.
Weitere Informationen finden Sie unter "Was ist die Spalte mit den geretteten Daten?".
stableIdentifierPrefixForUnionType member_ Beliebige Zeichenfolge Das Präfix, das für stabile Union-Typ-Feldnamen verwendet werden soll, wenn enableStableIdentifiersForUnionType=true.

CSV

Die folgenden Optionen gelten beim Lesen von CSV-Dateien.

Schlüssel Vorgabe Gültige Werte Description
badRecordsPath None Eine Pfadzeichenfolge Der Speicherpfad für Dateien, die Informationen über fehlerhafte CSV-Datensätze aufzeichnen.
charToEscapeQuoteEscaping \0 Ein einzelnes Zeichen Das Zeichen, das als Escapezeichen für das Zeichen verwendet wird, das als Escapezeichen für Anführungszeichen verwendet wird. z. B. für den Datensatz [ " a\\", b ]:
  • Wenn das Zeichen, um dem '\' zu entkommen, nicht definiert ist, werden die Daten nicht analysiert. Der Parser liest die Zeichen [a],[\],["],[,],[ ],[b] und löst einen Fehler aus, da kein schließendes Anführungszeichen gefunden wird.
  • Wenn das Zeichen, das zum Escapen des '\' verwendet wird, als '\' definiert ist, wird der Datensatz mit 2 Werten gelesen: [a\] und [b].
columnNameOfCorruptRecord _corrupt_record Eine Spaltennamenzeichenfolge Unterstützt für das Auto-Ladeprogramm Wird für COPY INTO (Legacy) nicht unterstützt.
Die Spalte zum Speichern von Datensätzen, die fehlerhaft formatiert sind und nicht analysiert werden können. Wenn mode für die Analyse auf DROPMALFORMED festgelegt ist, ist diese Spalte leer.
comment \0 Ein einzelnes Zeichen Definiert das Zeichen, das einen Zeilenkommentar darstellt, wenn es am Anfang einer Textzeile steht. Verwenden Sie '\0', um das Überspringen von Kommentaren zu deaktivieren.
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Das Format für die Analyse von Datumszeichenfolgen.
emptyValue Leere Zeichenfolge Beliebige Zeichenfolge Zeichenfolgendarstellung eines leeren Werts.
enableDateTimeParsingFallback false true, false Gibt an, ob auf das Alte Datums- und Zeitstempelanalyseverhalten zurückfallen soll, wenn ein Wert nicht mit dem angegebenen Format analysiert werden kann. Beim falseAnalysieren von Fehlern wird ein Fehler ausgelöst oder je nach modeNull erzeugt.
encoding oder charset UTF-8 Ein java.nio.charset.Charset Name Der Name der Codierung der CSV-Dateien. Eine Liste der Optionen finden Sie unter java.nio.charset.Charset. UTF-16 und UTF-32 können nicht verwendet werden, wenn multiline ist true.
enforceSchema true true, false Gibt an, ob das angegebene oder abgeleitete Schema zwangsweise auf die CSV-Dateien angewendet werden soll. Wenn die Option aktiviert ist, werden Kopfzeilen von CSV-Dateien ignoriert. Diese Option wird standardmäßig ignoriert, wenn der Autoloader verwendet wird, um Daten zu retten und die Schemaentwicklung zu ermöglichen.
escape \ Ein einzelnes Zeichen Das Escapezeichen, das beim Analysieren der Daten verwendet werden soll.
extension csv Eine Dateierweiterungszeichenfolge Die erwartete Dateinamenerweiterung für Lesevorgänge. Dateien ohne diese Erweiterung werden herausgefiltert.
failOnUnknownFields false true, false Gibt an, ob ein Fehler auftritt, wenn der CSV-Eintrag Spalten enthält, die im Schema nicht vorhanden sind. Wenn false, nicht erkannte Spalten werden in Abhängigkeit von rescuedDataColumn.
failOnWidenedFields false true, false Gibt an, ob ein Feldwert nicht als deklarierter Schematyp analysiert werden kann, ohne zu verbreitern. Wenn false, type-widened values are silently rescueed depending on rescuedDataColumn. Die Einstellung failOnUnknownFields=true kann die Effekte dieser Option maskierung.
header false true, false Gibt an, ob die CSV-Dateien ein Kopfzeile enthalten. Der Autoloader geht bei der Schemaableitung davon aus, dass Dateien Kopfzeilen enthalten.
ignoreLeadingWhiteSpace false true, false Gibt an, ob führende Leerzeichen für einzelne analysierte Werte ignoriert werden sollen.
ignoreTrailingWhiteSpace false true, false Gibt an, ob nachstehende Leerzeichen für einzelne analysierte Werte ignoriert werden sollen.
inferSchema false true, false Gibt an, ob die Datentypen der analysierten CSV-Datensätze abgeleitet werden sollen oder angenommen werden soll, dass alle Spalten den Typ StringType aufweisen. Bei Festlegung auf true ist eine zusätzliche Übergabe der Daten erforderlich. Verwenden Sie für den Autoloader stattdessen cloudFiles.inferColumnTypes.
inputBufferSize 1048576 (1 MB) Positive Zahlen Die Puffergröße in Byte für den CSV-Parser. Nützlich für die Optimierung der Speicherauslastung beim Analysieren großer CSV-Dateien.
lineSep Keine, die , \r\r\n, und\n Eine Zeichenfolge Eine Zeichenfolge zwischen zwei aufeinander folgenden CSV-Datensätzen.
locale US Ein java.util.Locale Bezeichner Ein Java Gebietsschema identifiziert, das sich auf das Standarddatum, den Zeitstempel und die Dezimalanalyse innerhalb der CSV auswirkt.
maxCharsPerColumn -1 Positive ganze Zahlen oder -1 für unbegrenzt Maximale Anzahl von Zeichen, die von einem zu analysierenden Wert erwartet werden. Kann verwendet werden, um Speicherfehler zu vermeiden. Der Standardwert ist -1, d. h. unbegrenzt.
maxColumns 20480 Positive Zahlen Der absolute Höchstwert für die Anzahl der Spalten, die ein Datensatz enthalten kann.
mergeSchema false true, false Gibt an, ob das Schema über mehrere Dateien hinweg abgeleitet und das Schema der einzelnen Dateien zusammengeführt werden soll. Standardmäßig für Autoloader aktiviert, wenn das Schema abgeleitet wird.
mode PERMISSIVE PERMISSIVE, DROPMALFORMEDFAILFAST Parsermodus für die Verarbeitung fehlerhaft formatierter Datensätze.
multiLine false true, false Gibt an, ob die CSV-Datensätze mehrere Zeilen umfassen.
nanValue NaN Beliebige Zeichenfolge Die Zeichenfolgendarstellung eines NaN-Werts, wenn FloatType- und DoubleType-Spalten verwendet werden.
negativeInf -Inf Beliebige Zeichenfolge Die Zeichenfolgendarstellung von negativ Unendlich, wenn FloatType- und DoubleType-Spalten verwendet werden.
nullValue Leere Zeichenfolge Beliebige Zeichenfolge Zeichenfolgendarstellung eines NULL-Werts.
parserCaseSensitive (veraltet) false true, false Gibt beim Lesen von Dateien an, ob Spalten, die in der Kopfzeile deklariert sind, unter Berücksichtigung der Groß-/Kleinschreibung am Schema angepasst werden sollen. Diese Option ist für den Autoloader standardmäßig true. Spalten, deren Groß-/Kleinschreibung abweicht, werden in die rescuedDataColumn-Spalte gerettet (sofern aktiviert). Diese Option wurde durch readerCaseSensitive ersetzt und gilt als veraltet.
positiveInf Inf Beliebige Zeichenfolge Die Zeichenfolgendarstellung von positiv Unendlich, wenn FloatType- und DoubleType-Spalten verwendet werden.
preferDate true true, false Versucht, Zeichenfolgen nach Möglichkeit als Datumsangaben abzuleiten, nicht als Zeitstempel. Sie müssen auch die Schemaausleitung verwenden, indem Sie das automatische Laden aktivieren inferSchema oder verwenden cloudFiles.inferColumnTypes .
quote " Ein einzelnes Zeichen Das Zeichen, das als Escapezeichen für Werte verwendet wird, bei denen das Feldtrennzeichen Bestandteil des Werts ist.
readerCaseSensitive true true, false Diese Option gibt das Verhalten bei Groß- und Kleinschreibung an, wenn rescuedDataColumn aktiviert ist. Wenn true, retten Sie die Datenspalten, deren Namen sich vom Schema unterscheiden. Wenn "false" lautet, lesen Sie die Daten ohne Groß-/Kleinschreibung.
rescuedDataColumn None Eine Spaltennamenzeichenfolge Gibt an, ob alle Daten, die aufgrund eines Datentypkonflikts oder eines Schemakonflikts (einschließlich der Schreibweise von Spaltennamen) nicht geparst werden können, in einer separaten Spalte gesammelt werden sollen. Diese Spalte ist bei Verwendung des Autoloaders standardmäßig enthalten. Weitere Informationen finden Sie unter "Was ist die Spalte mit den geretteten Daten?".
COPY INTO (Legacy) unterstützt die gerettete Datenspalte nicht, da Sie das Schema nicht manuell mit COPY INTOfestlegen können. Databricks empfiehlt die Verwendung des automatischen Ladens für die meisten Aufnahmeszenarien.
sep oder delimiter , Eine Zeichenfolge Die Trennzeichenfolge zwischen Spalten.
singleVariantColumn None Eine Spaltennamenzeichenfolge Wenn sie auf einen Spaltennamen festgelegt ist, wird der gesamte CSV-Eintrag in einer einzelnen VariantType Spalte mit diesem Namen gelesen, anstatt jedes Feld in eine eigene Spalte zu analysieren. Erfordert header=true.
skipRows 0 Positive ganze Zahlen oder 0 Die Anzahl der Zeilen vom Anfang der CSV-Datei, die ignoriert werden soll, einschließlich kommentierter und leerer Zeilen. Wenn header „True“ ist, ist die Kopfzeile die erste nicht übersprungene und nicht auskommentierte Zeile.
timeFormat HH:mm:ss Eine Zeitformatzeichenfolge Das Format für die Analyse von TimeType Spaltenwerten.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Eine Zeitstempelformatzeichenfolge Das Format zum Analysieren von Zeitstempelzeichenfolgen.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Das Format für die Analyse des Zeitstempels ohne Zeitzonenzeichenfolgen .The format for parsing timestamp without timezone (TimestampNTZType) strings.
timeZone None Eine java.time.ZoneId Zeichenfolge Die java.time.ZoneId, die beim Analysieren von Zeitstempeln und Datumsangaben verwendet werden soll.
unescapedQuoteHandling STOP_AT_DELIMITER STOP_AT_CLOSING_QUOTE, BACK_TO_DELIMITER, STOP_AT_DELIMITER, SKIP_VALUE, , RAISE_ERROR Die Strategie für die Behandlung von Anführungszeichen ohne Escapezeichen. Das Verhalten jeder zulässigen Option lautet wie folgt:
  • STOP_AT_CLOSING_QUOTE: Wenn in der Eingabe Anführungszeichen ohne Escapezeichen erkannt werden, wird das Anführungszeichen akkumuliert und der Wert als Wert in Anführungszeichen analysiert, bis ein schließendes Anführungszeichen erkannt wird.
  • BACK_TO_DELIMITER: Wenn in der Eingabe Anführungszeichen ohne Escapezeichen erkannt werden, wird der Wert als Wert ohne Anführungszeichen betrachtet. Der Parser akkumuliert dann alle Zeichen des aktuellen analysierten Werts, bis das von sep definierte Trennzeichen gefunden wird. Wenn im Wert kein Trennzeichen gefunden wird, akkumuliert der Parser weiter Zeichen aus der Eingabe, bis ein Trennzeichen oder Zeilenende gefunden wird.
  • STOP_AT_DELIMITER: Wenn in der Eingabe Anführungszeichen ohne Escapezeichen erkannt werden, wird der Wert als Wert ohne Anführungszeichen betrachtet. Dadurch wird der Parser veranlasst, alle Zeichen zu akkumulieren, bis das durch sep definierte Trennzeichen oder ein Zeilenende in der Eingabe gefunden wird.
  • SKIP_VALUE: Wenn in der Eingabe Anführungszeichen ohne Escapezeichen gefunden werden, wird der Inhalt, der für den angegebenen Wert geparst wurde, übersprungen (bis das nächste Trennzeichen gefunden wird) und stattdessen wird der in nullValue angegebene Wert erzeugt.
  • RAISE_ERROR: Wenn nicht gescapete Anführungszeichen in der Eingabe gefunden werden, wird eine TextParsingException ausgelöst.

Excel

Die folgenden Optionen gelten beim Lesen Excel Dateien.

Schlüssel Vorgabe Gültige Werte Description
dataAddress None Eine Zellenbereich- oder Blattnamenzeichenfolge Der zu lesende Zellbereich in Excel Syntax. Wenn dieser Wert nicht angegeben wird, werden alle gültigen Zellen aus dem ersten Blatt gelesen. Dient SheetName!C5:H10 zum Lesen eines Bereichs aus einem benannten Blatt, C5:H10 zum Lesen eines Bereichs vom ersten Blatt oder SheetName zum Lesen aller Daten aus einem bestimmten Blatt.
headerRows 0 0, 1 Anzahl der anfänglichen Zeilen, die als Spaltennamenüberschriften verwendet werden sollen. Wenn dataAddress angegeben, gilt dies innerhalb des Zellbereichs. Wenn 0, Spaltennamen automatisch generiert werden als _c1, _c2, , _c3usw.
ignoreMissingSheet false true, false Gibt an, ob Dateien ohne Hintergrund übersprungen werden sollen, die nicht das durch dataAddress. Wenn falseein Fehler ausgelöst wird, wenn eine Datei das angeforderte Blatt fehlt. Gilt nur, wenn ein Blattname angegeben dataAddresswird.
includePhoneticRuns false true, false Gibt an, ob phonetische Anmerkungen (z. B. Pinyin oder Furigana) beim Lesen von XLSX-Dateien mit Zellzeichenfolgenwerten verkettet werden sollen.
operation readSheet readSheet, listSheets Der Vorgang, der für die Excel Arbeitsmappe ausgeführt werden soll. readSheet liest Daten aus einem Blatt. listSheets gibt eine Struktur mit Feldern sheetIndex: long und sheetName: String für jedes Blatt zurück.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Benutzerdefinierte Formatzeichenfolge für Zeitstempel-ohne-Zeitzone-Werte, die als Zeichenfolgen in Excel gespeichert sind. Benutzerdefinierte Datumsformate folgen den Formaten der Datum-Zeit-Muster.
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Benutzerdefinierte Formatzeichenfolge für Zeichenfolgenwerte, die als gelesen werden Date. Benutzerdefinierte Datumsformate folgen den Formaten der Datum-Zeit-Muster.

JSON

Die folgenden Optionen gelten beim Lesen von JSON-Dateien.

Schlüssel Vorgabe Gültige Werte Description
allowBackslashEscapingAnyCharacter false true, false Gibt an, ob umgekehrte Schrägstriche als Escapezeichen für das folgende Zeichen zugelassen werden sollen. Wenn diese Option nicht aktiviert ist, können nur Zeichen mit Escapezeichen versehen werden, die explizit in der JSON-Spezifikation aufgeführt werden.
allowComments false true, false Gibt an, ob die Verwendung von Java-, C- und C++-Kommentaren ('/', '*' bzw. '//') in analysierten Inhalten zugelassen werden soll oder nicht.
allowNonNumericNumbers true true, false Gibt an, ob die Menge der NaN-Token (Not-a-Number) als zulässige Gleitkommazahlenwerte zugelassen werden soll.
allowNumericLeadingZeros false true, false Gibt an, ob ganze Zahlen mit zusätzlichen (zu ignorierenden) Nullen beginnen sollen (z. B. 000001).
allowSingleQuotes true true, false Gibt an, ob die Verwendung von einfachen Anführungszeichen (Apostroph, Zeichen '\') für das Zitieren von Zeichenfolgen, einschließlich Namen und Werte, zugelassen werden soll.
allowUnquotedControlChars false true, false Gibt an, ob JSON-Zeichenfolgen Steuerzeichen ohne Escapezeichen (ASCII-Zeichen mit einem Wert kleiner als 32, z. B. Tabstopp- und Zeilenvorschubzeichen) enthalten dürfen.
allowUnquotedFieldNames false true, false Gibt an, ob nicht angestellte Feldnamen verwendet werden sollen, die von JavaScript zulässig sind, aber nicht durch die JSON-Spezifikation.
alternateVariantEncoding None Z85 Die Codierung, die für Variant-Werte im JSON-Quellwert verwendet wird. Legen Sie diesen Wert fest, um Z85 Variant-Werte zu decodieren, die base85-codiert wurden, anstatt als Inline-JSON zu speichern.
badRecordsPath None Eine Pfadzeichenfolge Der Pfad zum Speichern von Dateien zur Aufzeichnung von Informationen über fehlerhafte JSON-Datensätze.
Die Verwendung der badRecordsPath Option in einer dateibasierten Datenquelle hat die folgenden Einschränkungen:
  • Es ist nicht transaktional und kann zu inkonsistenten Ergebnissen führen.
  • Vorübergehende Fehler werden als Fehler behandelt.
columnNameOfCorruptRecord _corrupt_record Eine Spaltennamenzeichenfolge Die Spalte zum Speichern von Datensätzen, die fehlerhaft formatiert sind und nicht analysiert werden können. Wenn mode für die Analyse auf DROPMALFORMED festgelegt ist, ist diese Spalte leer.
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Das Format für die Analyse von Datumszeichenfolgen.
dropFieldIfAllNull false true, false Ob Spalten während der Schemaerkennung ignoriert werden sollen, die nur NULL-Werte oder leere Arrays und Strukturen enthalten.
encoding oder charset UTF-8 Ein java.nio.charset.Charset Name Der Name der Codierung der JSON-Dateien. Eine Liste der Optionen finden Sie unter java.nio.charset.Charset. Sie können UTF-16 und UTF-32 nicht verwenden, wenn multilinetrue ist.
inferTimestamp false true, false Gibt an, ob versucht werden soll, Zeitstempelzeichenfolgen als TimestampType abzuleiten. Bei Festlegung auf true", kann die Schema-Ableitung spürbar länger dauern. Für die Verwendung mit dem Autoloader müssen Sie cloudFiles.inferColumnTypes aktivieren.
lineSep Keine, die , \r\r\n, und\n Eine Zeichenfolge Eine Zeichenfolge zwischen zwei aufeinander folgenden JSON-Datensätzen.
locale US Ein java.util.Locale Bezeichner Ein Java Gebietsschemabezeichner, der sich auf das Standarddatum, den Zeitstempel und die Dezimalanalyse innerhalb des JSON-Codes auswirkt.
maxNestingDepth 500 Positive Zahlen Die maximal zulässige Schachtelungstiefe für JSON-Objekte und Arrays. Erhöhen Sie diesen Wert für tief geschachtelte Dokumente.
maxNumLen 1000 Positive Zahlen Die maximale Länge von Zahlentoken in der JSON-Eingabe. Erhöhen Sie diesen Wert für JSON mit großen numerischen Literalen.
maxStringLen Unbeschränkt Positive Zahlen Die maximale Länge von Zeichenfolgenwerten in der JSON-Eingabe. Legen Sie fest, dass die Speicherauslastung beim Analysieren von JSON mit großen Zeichenfolgen begrenzt wird.
mode PERMISSIVE PERMISSIVE, DROPMALFORMEDFAILFAST Parsermodus für die Verarbeitung fehlerhaft formatierter Datensätze.
multiLine false true, false Gibt an, ob die JSON-Datensätze mehrere Zeilen umfassen.
prefersDecimal false true, false Versucht, Zeichenfolgen nach Möglichkeit als DecimalType abzuleiten, nicht als float- oder double-Typ. Sie müssen auch die Schemaausleitung verwenden, indem Sie das automatische Laden aktivieren inferSchema oder verwenden cloudFiles.inferColumnTypes .
primitivesAsString false true, false Gibt an, ob primitive Typen wie Zahlen und boolesche Werte als StringType abgeleitet werden sollen.
readerCaseSensitive true true, false Diese Option gibt das Verhalten bei Groß- und Kleinschreibung an, wenn rescuedDataColumn aktiviert ist. Wenn true, retten Sie die Datenspalten, deren Namen sich vom Schema unterscheiden. Wenn "false" lautet, lesen Sie die Daten ohne Groß-/Kleinschreibung. Verfügbar in Databricks Runtime 13.3 und höher.
rescuedDataColumn None Eine Spaltennamenzeichenfolge Gibt an, ob alle Daten gesammelt werden sollen, die aufgrund eines Datentypkonflikts oder eines Schemakonflikts (einschließlich spaltenweiser Groß-/Kleinschreibung) nicht analysiert werden können. Diese Spalte ist bei Verwendung des Autoloaders standardmäßig enthalten. Weitere Informationen finden Sie unter "Was ist die Spalte für gerettete Daten?".
COPY INTO (Legacy) unterstützt die gerettete Datenspalte nicht, da Sie das Schema nicht manuell mit COPY INTOfestlegen können. Databricks empfiehlt die Verwendung des automatischen Ladens für die meisten Aufnahmeszenarien.
singleVariantColumn None Eine Spaltennamenzeichenfolge Gibt an, ob das gesamte JSON-Dokument aufgenommen werden soll, analysiert in eine einzelne Variant-Spalte mit der angegebenen Zeichenfolge als Namen der Spalte. Wenn nicht festgelegt, werden die JSON-Felder in ihre eigenen Spalten aufgenommen.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Eine Zeitstempelformatzeichenfolge Das Format zum Analysieren von Zeitstempelzeichenfolgen.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Das Format für die Analyse des Zeitstempels ohne Zeitzonenzeichenfolgen .The format for parsing timestamp without timezone (TimestampNTZType) strings.
timeZone None Eine java.time.ZoneId Zeichenfolge Die java.time.ZoneId, die beim Analysieren von Zeitstempeln und Datumsangaben verwendet werden soll.
upgradeExceptionAsBadRecord false true, false Gibt an, ob Typupgrade-Ausnahmen behandelt werden sollen (z. B. wenn ein Wert nicht auf den deklarierten Spaltentyp erweitert werden kann) als ungültige Datensätze, anstatt eine Ausnahme auszuwerfen.

Kafka

Eine vollständige Liste der Kafka-Leseoptionen finden Sie unter DataStreamReader Kafka-Optionen. Die folgenden Optionen gelten nur für Batchlesevorgänge.spark.read.format("kafka")

Schlüssel Vorgabe Gültige Werte Description
endingOffsets latest latest, oder eine JSON-Offsetzeichenfolge Wo das Lesen beendet werden soll. In der JSON-Zeichenfolge -1 ist der neueste Offset angegeben. -2, bei dem es sich um den frühesten Offset handelt, ist nicht als Endoffset zulässig. Dies ist ein Beispiel für eine JSON-Offsetzeichenfolge: {"topicA":{"0":50,"1":-1}}.
endingOffsetsByTimestamp None Eine JSON-Zeitstempelzeichenfolge Verschiebungen pro Partition, die als Zeitstempel in Millisekunden angegeben sind. Beispiel: {"topicA":{"0":2000,"1":3000}}.
endingTimestamp None Positive ganze Zahlen oder 0 Globaler Endzeitstempel in Millisekunden, der auf alle Partitionen angewendet wird.

ORC

Die folgenden Optionen gelten beim Lesen von ORC-Dateien.

Schlüssel Vorgabe Gültige Werte Description
mergeSchema false true, false Gibt an, ob das Schema über mehrere Dateien hinweg abgeleitet und das Schema der einzelnen Dateien zusammengeführt werden soll.

Parkett

Die folgenden Optionen gelten beim Lesen von Parkettdateien.

Schlüssel Vorgabe Gültige Werte Description
datetimeRebaseMode LEGACY EXCEPTION, LEGACYCORRECTED Steuert, ob DATE- und TIMESTAMP-Werte auf dem gregorianischen Kalender und dem proleptischen gregorianischen Kalender basieren sollen.
int96RebaseMode LEGACY EXCEPTION, LEGACYCORRECTED Steuert, ob INT96-Zeitstempelwerte auf dem gregorianischen Kalender und dem proleptischen gregorianischen Kalender basieren sollen.
mergeSchema false true, false Gibt an, ob das Schema über mehrere Dateien hinweg abgeleitet und das Schema der einzelnen Dateien zusammengeführt werden soll.
readerCaseSensitive true true, false Diese Option gibt das Verhalten bei Groß- und Kleinschreibung an, wenn rescuedDataColumn aktiviert ist. Wenn true, retten Sie die Datenspalten, deren Namen sich vom Schema unterscheiden. Wenn "false" lautet, lesen Sie die Daten ohne Groß-/Kleinschreibung.
rescuedDataColumn None Eine Spaltennamenzeichenfolge Gibt an, ob alle Daten, die aufgrund eines Datentypkonflikts oder eines Schemakonflikts (einschließlich der Schreibweise von Spaltennamen) nicht geparst werden können, in einer separaten Spalte gesammelt werden sollen. Diese Spalte ist bei Verwendung des Autoloaders standardmäßig enthalten. Weitere Informationen finden Sie unter "Was ist die Spalte mit den geretteten Daten?".
COPY INTO (Legacy) unterstützt die gerettete Datenspalte nicht, da Sie das Schema nicht manuell mit COPY INTOfestlegen können. Databricks empfiehlt die Verwendung des automatischen Ladens für die meisten Aufnahmeszenarien.

Statusspeicher

Verwenden Sie diese Optionen mit spark.read.format("statestore") oder der read_statestore Tabellenwertfunktion, um Strukturierte Streaming-Statusdaten zu lesen. Weitere Informationen finden Sie unter Lesen von strukturierten Streamingstatusinformationen.

Schlüssel Vorgabe Gültige Werte Description
batchId Neueste Batch-ID Positive ganze Zahlen oder 0 Der Zielbatch, aus dem gelesen werden soll. Wird verwendet, um einen früheren Status der Abfrage abzufragen. Der Batch muss committet, aber noch nicht bereinigt werden.
operatorId 0 Positive ganze Zahlen oder 0 Der Zieloperator, aus dem gelesen werden soll. Wird verwendet, wenn die Abfrage über mehrere zustandsbehaftete Operatoren verfügt.
storeName DEFAULT Beliebige Zeichenfolge Der Name des Zielstatusspeichers, aus dem gelesen werden soll. Wird verwendet, wenn der zustandsbehaftete Operator über mehrere Instanzen des Zustandsspeichers verfügt. Sie müssen entweder storeName oder joinSide für einen Stream-Stream-Join angeben, aber nicht für beides.
joinSide None left, right Die Zielseite, aus der für eine Streamstream-Verknüpfung gelesen werden soll. Sie müssen entweder storeName oder joinSide für einen Stream-Stream-Join angeben, aber nicht für beides.
snapshotStartBatchId None Positive ganze Zahlen oder 0 Die Batch-ID der Momentaufnahme, die beim Lesen als Ausgangspunkt verwendet werden soll. Der Leser erstellt den Zustand neu, indem Änderungen von dieser Momentaufnahme bis zur batchIdWiedergabe wiedergegeben werden. Nützlich, wenn eine Momentaufnahme beschädigt ist. Muss zusammen mit snapshotPartitionId. Kann nicht mit readChangeFeed. Unterstützt den HDFS-gesicherten Zustandsspeicher und den RocksDB-Zustandsspeicher mit aktivierter Changelog-Prüfpunkterstellung. Verfügbar in Databricks Runtime 15.4 LTS und höher.
snapshotPartitionId None Positive ganze Zahlen oder 0 Wenn angegeben, liest die Abfrage nur diese Partition. Muss zusammen mit snapshotStartBatchId. Kann nicht mit readChangeFeed. Verfügbar in Databricks Runtime 15.4 LTS und höher.
readChangeFeed false true, false Wenn true, gibt Zustandsänderungen in einem angegebenen Bereich von Batches zwischen changeStartBatchId und changeEndBatchId. Erfordert changeStartBatchId. Kann nicht mit joinSide, , batchId, snapshotStartBatchIdoder snapshotPartitionId. Verfügbar in Databricks Runtime 16.4 LTS und höher.
Ausführliche Informationen finden Sie unter Änderungen des Status "Strukturiertes Streaming lesen".
changeStartBatchId None Positive ganze Zahlen oder 0 Die Startbatch-ID für den Änderungsfeedbereich. Erforderlich, wenn readChangeFeed gleich true ist. Gilt nur, wenn readChangeFeed sie auf true. Verfügbar in Databricks Runtime 16.4 LTS und höher.
changeEndBatchId Neueste Batch-ID Positive ganze Zahlen oder 0 Die endende Batch-ID für den Änderungsfeedbereich. Muss größer oder gleich sein.changeStartBatchId Gilt nur, wenn readChangeFeed sie auf true. Verfügbar in Databricks Runtime 16.4 LTS und höher.
stateVarName None Beliebige Zeichenfolge Der zu lesende Statusvariablenname. Der Name der Statusvariablen ist der eindeutige Name jeder Variablen innerhalb der init Funktion eines StatefulProcessor vom transformWithState Operator verwendeten. Erforderlich, wenn Sie den transformWithState Operator verwenden. Verfügbar in Databricks Runtime 16.4 LTS und höher.
readRegisteredTimers false true, false Wenn true, liest registrierte Zeitgeber, die vom transformWithState Operator verwendet werden. Gilt nur für den transformWithState Operator. Verfügbar in Databricks Runtime 16.4 LTS und höher.
flattenCollectionTypes true true, false Wenn truedie datensätze, die für Karten- und Listenzustandsvariablen zurückgegeben werden, wird abgeflacht. Wenn false, gibt Datensätze als Spark SQL Array oder Map. Gilt nur für den transformWithState Operator. Verfügbar in Databricks Runtime 16.4 LTS und höher.

Text

Die folgenden Optionen gelten beim Lesen von Textdateien.

Schlüssel Vorgabe Gültige Werte Description
encoding UTF-8 Ein java.nio.charset.Charset Name Der Name der Codierung des TEXT-Dateizeilentrennzeichens. Der Inhalt der Datei ist von dieser Option nicht betroffen und wird as-isgelesen.
lineSep Keine, die umfasst \rund \r\n\n Eine Zeichenfolge Eine Zeichenfolge zwischen zwei aufeinander folgenden TEXT-Datensätzen.
wholeText false true, false Gibt an, ob eine Datei als einzelner Datensatz gelesen werden soll.

XML

Die folgenden Optionen gelten beim Lesen von XML-Dateien.

Schlüssel Vorgabe Gültige Werte Description
rowTag None Beliebige Zeichenfolge Das Reihen-Tag der XML-Dateien, die als Reihe behandelt werden sollen. Im XML-Beispiel <book> <page><page>...<book> ist der entsprechende Wert page. Diese Option muss angegeben werden.
samplingRatio 1.0 0.0 bis 1.0 Hiermit wird ein Bruchteil von Zeilen definiert, die für den Schemarückschluss verwendet werden. Diese Option wird von integrierten XML-Funktionen ignoriert.
excludeAttribute false true, false Gibt an, ob Attribute in Elementen ausgeschlossen werden sollen.
mode None PERMISSIVE, DROPMALFORMEDFAILFAST Modus für den Umgang mit beschädigten Datensätzen beim Parsen.
  • PERMISSIVE: Fügt bei beschädigten Datensätzen die nicht wohlgeformte Zeichenfolge in ein durch columnNameOfCorruptRecord konfiguriertes Feld ein und legt nicht wohlgeformte Felder auf null fest. Um beschädigte Datensätze beizubehalten, können Sie in einem benutzerdefinierten Schema ein Feld des Typs string mit dem Namen columnNameOfCorruptRecord festlegen. Wenn ein Schema nicht über das Feld verfügt, werden beschädigte Datensätze beim Parsen gelöscht. Beim Ableiten eines Schemas fügt der Parser in einem Ausgabeschema ein columnNameOfCorruptRecord-Feld implizit hinzu.
  • DROPMALFORMED: Ignoriert beschädigte Datensätze. Dieser Modus wird für integrierte XML-Funktionen nicht unterstützt.
  • FAILFAST: Eine Ausnahme wird ausgelöst, wenn der Parser beschädigte Datensätze erkennt.
inferSchema true true, false Versucht bei true, einen geeigneten Typ für jede resultierende DataFrame-Spalte abzuleiten. Bei false weisen alle resultierenden Spalten den Typ string auf. Diese Option wird von integrierten XML-Funktionen ignoriert.
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord Eine Spaltennamenzeichenfolge Ermöglicht das Umbenennen des neuen Felds, das eine falsch formatierte Zeichenfolge enthält, die im PERMISSIVE Modus erstellt wurde.
attributePrefix None Beliebige Zeichenfolge Das Präfix für Attribute, um Attribute von Elementen zu unterscheiden. Dies wird das Präfix für Feldnamen sein. Der Standardwert ist _. Kann zum Lesen von XML-Code leer sein, jedoch nicht zum Schreiben. Gilt auch für DataFrameWriter-XML-Optionen.
valueTag _VALUE Beliebige Zeichenfolge Dies ist das Tag, das für die Zeichendaten in Elementen verwendet wird, die ebenfalls Attribute oder untergeordnete Elemente enthalten. Der Benutzer kann das valueTag-Feld im Schema angeben oder es wird während des Schemaabgleichs automatisch hinzugefügt, wenn Zeichendaten in Elementen mit anderen Elementen oder Attributen vorkommen. Gilt auch für DataFrameWriter-XML-Optionen.
encoding UTF-8 Ein java.nio.charset.Charset Name Zum Lesen werden die XML-Dateien durch den angegebenen Codierungstyp decodiert. Gibt zum Schreiben die Codierung (Zeichensatz) gespeicherter XML-Dateien an. Diese Option wird von integrierten XML-Funktionen ignoriert. Gilt auch für DataFrameWriter-XML-Optionen.
ignoreSurroundingSpaces true true, false Gibt an, ob Leerzeichen, die Werte umgeben, übersprungen werden müssen. Zeichendaten, die ausschließlich aus Leerzeichen bestehen, werden ignoriert.
rowValidationXSDPath None Eine Dateipfadzeichenfolge Pfad zu einer optionalen XSD-Datei, die verwendet wird, um den XML-Code für jede Zeile einzeln zu überprüfen. Zeilen, die nicht überprüft werden, werden wie Analysefehler behandelt. Die XSD wirkt sich nicht auf das Schema aus, unabhängig davon, ob es angegeben oder abgeleitet ist.
ignoreNamespace false true, false Wenn true aktiv ist, werden die Präfixe von Namespaces für XML-Elemente und -Attribute ignoriert. Die Tags <abc:author> und <def:author> werden beispielsweise so behandelt werden, als wären beide lediglich <author>. Namespaces können für das rowTag-Element nicht ignoriert werden. Das ist lediglich für seine untergeordneten Elemente möglich, die gelesen werden sollen. Das XML-Parsing ist selbst bei false nicht namespacefähig.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Eine Zeitstempelformatzeichenfolge Benutzerdefinierte Zeitstempelformatzeichenfolge, die dem Datetime-Musterformat folgt. Dies gilt für Typ timestamp. Gilt auch für DataFrameWriter-XML-Optionen.
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Benutzerdefinierte Zeichenfolge für das Zeitstempelformat ohne Zeitzone, die dem Format des Datetime-Musters folgt. Dies gilt für den TimestampNTZType-Typ. Gilt auch für DataFrameWriter-XML-Optionen.
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Benutzerdefinierte Datumsformatzeichenfolge, die dem Datetime-Musterformat folgt. Dies gilt für den Datumstyp. Gilt auch für DataFrameWriter-XML-Optionen.
locale en-US Ein IETF BCP 47-Sprachtag Hiermit wird ein Gebietsschema als Sprachtag im IETF BCP 47-Format festgelegt. Beispiel: locale wird beim Parsen von Daten und Zeitstempeln verwendet.
nullValue Zeichenfolge null Beliebige Zeichenfolge Legt die Darstellung der Zeichenfolge eines NULL-Werts fest. Wenn dies der Fall ist null, schreibt der Parser keine Attribute und Elemente für Felder. Gilt auch für DataFrameWriter-XML-Optionen.
readerCaseSensitive true true, false Gibt das Verhalten bei der Berücksichtigung der Groß-/Kleinschreibung an, wenn rescuedDataColumn aktiviert ist. Wenn true, retten Sie die Datenspalten, deren Namen sich vom Schema unterscheiden. Wenn "false" lautet, lesen Sie die Daten ohne Groß-/Kleinschreibung.
rescuedDataColumn None Eine Spaltennamenzeichenfolge Gibt an, ob alle Daten, die aufgrund eines Datentypkonflikts oder eines Schemakonflikts (einschließlich der Groß-/Kleinschreibung von Spaltennamen) nicht analysiert werden können, in eine separate Spalte erfasst werden sollen. Diese Spalte ist bei Verwendung des Autoloaders standardmäßig enthalten. Weitere Informationen finden Sie in der Spalte "Gerettete Daten". COPY INTO (Legacy) unterstützt die gerettete Datenspalte nicht, da Sie das Schema nicht manuell mit COPY INTOfestlegen können. Databricks empfiehlt die Verwendung des automatischen Ladens für die meisten Aufnahmeszenarien.
singleVariantColumn none Eine Spaltennamenzeichenfolge Gibt den Namen der einzelnen Variantenspalte an. Wenn diese Option zum Lesen angegeben ist, analysieren Sie den gesamten XML-Eintrag in eine einzelne Variant-Spalte mit dem angegebenen Optionszeichenfolgenwert als Namen der Spalte. Wenn diese Option zum Schreiben angegeben ist, schreiben Sie den Wert der einzelnen Variant-Spalte in XML-Dateien. Gilt auch für DataFrameWriter-XML-Optionen.
useLegacyXMLParser true true, false Gibt an, ob der Legacy-XML-Parser verwendet werden soll. Der Legacyparser verfügt über eine weniger strenge Überprüfung für falsch formatierte Inhalte, ist jedoch weniger speichereffizient. Legen Sie fest, dass false sie sich für den strengeren Standardparser entscheiden soll.
wildcardColName xs_any Eine Spaltennamenzeichenfolge Der Spaltenname, der zum Erfassen von XML-Elementen verwendet wird, die mit dem Wildcard(xs:any)-Schemaelement übereinstimmen. Kann nicht zusammen mit rescuedDataColumn.

DataStreamReader-Optionen

Verwenden Sie diese Optionen zum DataStreamReader.option() Konfigurieren von Streaming-Lesevorgängen aus Delta Lake-Tabellen und anderen dateibasierten Quellen.

Informationen zu Dateiformatoptionen (JSON, CSV, Parkett und andere) finden Sie unter DataFrameReader-Optionen.

Optionen für das automatische Laden (cloudFiles.*) finden Sie unter "Auto Loader".

Example

Im folgenden Beispiel wird für einen Delta Lake-Tabellendatenstrom festgelegt maxFilesPerTrigger10 :

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

Gemeinsam

Die folgenden Optionen gelten für Delta Lake-Tabellen und andere dateibasierte Streamingquellen.

Schlüssel Vorgabe Gültige Werte Description
cleanSource off off, deletearchive Behandeln von Quelldateien, nachdem sie vom Datenstrom verarbeitet wurden. off ergreift keine Aktion. delete Löscht die Quelldatei endgültig. archive verschiebt die Datei in sourceArchiveDir. Wenn dieser Wert auf ", archivesourceArchiveDir muss auch festgelegt werden. Gilt nicht für Das Streaming von Delta Lake-Tabellen.
fileNameOnly false true, false Gibt an, ob bereits verarbeitete Dateien nur anhand des Dateinamens und nicht anhand des vollständigen Pfads identifiziert werden sollen. Wenn true, Dateien auf unterschiedlichen Pfaden mit demselben Dateinamen behandelt werden wie die gleiche Datei und werden nicht erneut verarbeitet. Gilt nicht für Das Streaming von Delta Lake-Tabellen.
latestFirst false true, false Gibt an, ob die zuletzt geänderten Dateien zuerst in jedem Mikrobatch verarbeitet werden sollen. Nützlich, wenn Sie die neuesten Daten so schnell wie möglich verarbeiten möchten. Wenn true und maxFilesPerTrigger oder maxBytesPerTrigger festgelegt wird, maxFileAge wird ignoriert. Gilt nicht für Das Streaming von Delta Lake-Tabellen.
maxBytesPerTrigger None Positive Zahlen Soft maximum for the amount of data processed for each micro-batch. Ein Batch kann mehr als den Grenzwert verarbeiten, wenn die kleinste Eingabeeinheit sie überschreitet. Bei Verwendung zusammen mit maxFilesPerTriggerdem Mikrobatch verarbeitet die Mikrobatch daten, bis beide Grenzwerte zuerst erreicht sind.
Verwenden Sie für den Autoloader stattdessen cloudFiles.maxBytesPerTrigger. Siehe "Allgemein".
maxCachedFiles 10000 Positive ganze Zahlen oder 0 Maximale Anzahl nicht verarbeiteter Dateien, die für nachfolgende Mikrobatches zwischengespeichert werden sollen. Legen Sie fest, 0 dass die Zwischenspeicherung deaktiviert wird. Erhöhen Sie diesen Wert, wenn das Quellverzeichnis viele neue Dateien für jeden Trigger enthält. Gilt nicht für Das Streaming von Delta Lake-Tabellen.
maxFileAge 7d Eine Dauerzeichenfolge, z 7d . B. oder 4h Maximales Alter von Dateien, die für die Verarbeitung berücksichtigt werden, relativ zum Zeitstempel der zuletzt geänderten Datei anstelle der aktuellen Systemzeit. Dateien, die älter als dieser Schwellenwert sind, werden ignoriert. Wird ignoriert, wenn latestFirst und truemaxFilesPerTrigger oder maxBytesPerTrigger festgelegt wird. Gilt nicht für Das Streaming von Delta Lake-Tabellen.
maxFilesPerTrigger 1000 für Delta Lake und Auto Loader. Kein Maximum für andere dateibasierte Quellen. Positive Zahlen Obere Grenze für die Anzahl der neuen Dateien, die in jedem Mikrobatch verarbeitet werden. Bei Verwendung zusammen mit maxBytesPerTriggerdem Mikrobatch verarbeitet die Mikrobatch daten, bis beide Grenzwerte zuerst erreicht sind.
Verwenden Sie für den Autoloader stattdessen cloudFiles.maxFilesPerTrigger. Siehe "Allgemein".
sourceArchiveDir None Eine Pfadzeichenfolge Pfad zum Archivverzeichnis, wenn cleanSource auf .archive Quelldateien werden nach der Verarbeitung in diesen Pfad verschoben, wobei ihre relative Verzeichnisstruktur beibehalten wird. Gilt nicht für Das Streaming von Delta Lake-Tabellen.

AutoLadeprogramm

Verwenden Sie diese Optionen mit der cloudFiles Quelle, um das automatische Ladeprogramm für die Streamingaufnahme aus dem Cloudspeicher zu konfigurieren. Optionen, die für die cloudFiles Quelle spezifisch sind, werden präfixiert cloudFiles , um sie in einem separaten Namespace von anderen Optionen für strukturiertes Streaming zu behalten.

Gemeinsam

Die folgenden Optionen gelten für alle Konfigurationen des automatischen Ladeprogramms.

Schlüssel Vorgabe Gültige Werte Description
cloudFiles.allowOverwrites false true, false Gibt an, ob Änderungen der Eingabeverzeichnisdatei zum Überschreiben vorhandener Daten zulässig sind.
Informationen zu Konfigurationseinschränkungen finden Sie unter Verarbeitet das automatische Laden die Datei erneut, wenn die Datei angefügt oder überschrieben wird?.
cloudFiles.backfillInterval None Eine Dauerzeichenfolge, z 1 day . B. oder 1 week Auto Loader kann asynchrone Rückfüllungen in einem bestimmten Intervall auslösen. Weitere Informationen finden Sie unter Regelmäßige Rückfüllungen mit cloudFiles.backfillInterval auslösen.
Nicht verwenden, wenn cloudFiles.useManagedFileEvents auf true eingestellt ist.
cloudFiles.cleanSource OFF OFF, DELETEMOVE Gibt an, ob verarbeitete Dateien automatisch aus dem Eingabeverzeichnis gelöscht oder verschoben werden sollen. Bei Festlegung auf OFF (Standard) werden keine Dateien gelöscht.
Wenn diese Einstellung auf DELETE"Auto Loader" festgelegt ist, werden Dateien automatisch 30 Tage nach der Verarbeitung gelöscht. Dazu muss das automatische Laden über Schreibberechtigungen für das Quellverzeichnis verfügen.
Wenn dieser Wert auf "Auto Loader" festgelegt ist MOVE, werden Dateien nach der Verarbeitung in cloudFiles.cleanSource.moveDestination 30 Tagen automatisch an den angegebenen Speicherort verschoben. Autoloader muss über Schreibberechtigungen für das Quellverzeichnis sowie den Verschiebungsort verfügen.
Eine Datei wird als verarbeitet betrachtet, wenn sie einen Wert ungleich Null für commit_time das Ergebnis der cloud_files_state Tabellenwertfunktion aufweist. Siehe cloud_files_state Tabellenwertfunktion. Die 30-tägige zusätzliche Wartezeit nach der Verarbeitung kann mithilfe der cloudFiles.cleanSource.retentionDurationKonfiguration konfiguriert werden.
Überprüfen Sie die folgenden Überlegungen, bevor Sie folgendes aktivieren cloudFiles.cleanSource:
  • Azure Databricks empfiehlt die Verwendung dieser Option nicht, wenn mehrere Datenströme daten vom Quellspeicherort verbrauchen, da der schnellste Verbraucher die Dateien löscht und sie nicht in den langsameren Quellen aufgenommen werden.
  • Zum Aktivieren dieses Features ist das automatische Laden erforderlich, um zusätzlichen Zustand im Prüfpunkt beizubehalten, was zu Leistungsaufwand kommt, aber eine verbesserte Observierbarkeit über die cloud_files_state Tabellenwertfunktion ermöglicht. Siehe cloud_files_state Tabellenwertfunktion.
  • cleanSource verwendet die aktuelle Einstellung, um zu entscheiden, ob oder MOVEDELETE eine bestimmte Datei. Angenommen, die Einstellung war MOVE, als die Datei ursprünglich verarbeitet wurde, wurde aber in DELETE geändert, als die Datei 30 Tage später zum Kandidaten für die Bereinigung wurde. In diesem Fall löscht cleanSource die Datei.
  • Dateien werden nicht garantiert bereinigt, sobald die retentionDuration Dateien ablaufen. Um die Kosten niedrig zu halten, löscht auto Loader Dateien gleichzeitig mit der Datenstromverarbeitung und beendet, sobald die Datenstromverarbeitung abgeschlossen oder beendet wird. Dateien, die für die Bereinigung geeignet waren, während der Datenstromverarbeitung jedoch nicht bereinigt werden konnten, werden beim nächsten Ausführen des automatischen Ladeprogramms aufgenommen.

Verfügbar in Databricks Runtime 16.4 und höher.
cloudFiles.cleanSource.retentionDuration 30 days Eine CalendarInterval-Zeichenfolge wie 14 days, oder 2 weeks1 month Die Anzahl der Zeit, die gewartet werden muss, bevor verarbeitete Dateien zu Archivierungskandidaten mit cleanSource werden. Für DELETE muss es größer als 7 Tage sein. Keine Mindesteinschränkung für MOVE.
Verfügbar in Databricks Runtime 16.4 und höher.
cloudFiles.cleanSource.moveDestination None Ein Cloudspeicher- oder Unity-Katalog-Volumepfad Pfad zum Archivieren verarbeiteter Dateien, wenn cloudFiles.cleanSource auf MOVE gesetzt wird. Dies kann ein Cloudspeicherpfad oder ein Unity Catalog-Volumepfad sein (z. B /Volumes/my_catalog/my_schema/my_volume/archive/. ).
Der Speicherort muss:
  • Kein untergeordnetes Element des Quellverzeichnisses. Wenn Sie das Verschiebungsziel im Quellverzeichnis platzieren, werden die archivierten Dateien erneut aufgenommen.
  • Befinden Sie sich an demselben externen Speicherort, Volume oder DBFS-Mount wie die Quelle. Cross-Bucket- und Cross-Container-Verschiebungen werden nicht unterstützt und erzeugen einen Fehler.

Auto Loader muss Schreibberechtigungen für dieses Verzeichnis besitzen.
Verfügbar in Databricks Runtime 16.4 und höher.
cloudFiles.format Keine (erforderliche Option) avro, binaryFile, csv, json, , orc, parquet, text, xml Das Datendateiformat im Quellpfad. Gültige Werte sind:
cloudFiles.includeExistingFiles true true, false Gibt an, ob vorhandene Dateien in den Eingabepfad für die Streamverarbeitung einbezogen werden, oder ob nur neue Dateien verarbeitet werden sollen, die nach der Ersteinrichtung eingehen. Diese Option wird nur ausgewertet, wenn Sie einen Stream zum ersten Mal starten. Das Ändern dieser Option nach dem Neustart des Streams hat keine Auswirkungen.
cloudFiles.inferColumnTypes false true, false Gibt an, ob exakte Spaltentypen abgeleitet werden sollen, wenn der Schemarückschluss verwendet wird. Standardmäßig werden Spalten als Zeichenfolgen abgeleitet, wenn JSON- und CSV-Datasets abgeleitet werden. Weitere Informationen finden Sie unter schemainference .
cloudFiles.maxBytesPerTrigger None Eine Bytezeichenfolge, z. B. 10g Die maximale Anzahl neuer Bytes, die in jedem Trigger verarbeitet werden sollen. Dies ist ein weicher Maximalwert. Wenn Sie über Dateien mit jeweils 3 GB verfügen, verarbeitet Azure Databricks 12 GB in einem Mikrobatch. Eine einzelne Datei wird niemals auf Mikrobatches aufgeteilt; es wird immer vollständig innerhalb eines einzigen verarbeitet, auch wenn seine Größe diesen Grenzwert überschreitet. Bei Verwendung in Kombination mit cloudFiles.maxFilesPerTrigger steigt der Verbrauch von Azure Databricks bis zur Untergrenze von cloudFiles.maxFilesPerTrigger oder cloudFiles.maxBytesPerTrigger, je nachdem, welcher Wert zuerst erreicht wird. Diese Option hat keine Auswirkung, wenn sie mit Trigger.Once() verwendet wird (Trigger.Once() ist veraltet).
In Databricks Runtime 18.0 und höher ist diese Option dynamisch konfiguriert und muss nicht manuell festgelegt werden.
cloudFiles.maxFileAge None Eine Dauerzeichenfolge Gibt an, wie lange ein Dateiereignis zu Deduplizierungszwecken nachverfolgt wird. Databricks empfiehlt, diesen Parameter nur dann anzupassen, wenn Sie Daten in einer Größenordnung von mehreren Millionen Dateien pro Stunde erfassen. Weitere Informationen finden Sie im Abschnitt zur Dateiereignisverfolgung .
Eine zu aggressive Optimierung von cloudFiles.maxFileAge kann zu Problemen mit der Datenqualität führen, z. B. zu doppelter Erfassung oder fehlenden Dateien. Daher empfiehlt Databricks eine konservative Einstellung für cloudFiles.maxFileAge, z. B. 90 Tage. Dies entspricht ungefähr der Empfehlung vergleichbarer Datenerfassungslösungen.
cloudFiles.maxFilesPerTrigger 1000 Positive Zahlen Die maximale Anzahl neuer Dateien, die in jedem Trigger verarbeitet werden sollen. Bei Verwendung in Kombination mit cloudFiles.maxBytesPerTrigger steigt der Verbrauch von Azure Databricks bis zur Untergrenze von cloudFiles.maxFilesPerTrigger oder cloudFiles.maxBytesPerTrigger, je nachdem, welcher Wert zuerst erreicht wird. Diese Option hat keine Auswirkung, wenn sie mit Trigger.Once() (veraltet) verwendet wird.
In Databricks Runtime 18.0 und höher ist diese Option dynamisch konfiguriert und muss nicht manuell festgelegt werden.
cloudFiles.partitionColumns None Eine durch Trennzeichen getrennte Liste von Spaltennamen Eine durch Trennzeichen getrennte Liste von Partitionsspalten im Hive-Stil, die Sie aus der Verzeichnisstruktur der Dateien abgeleitet möchten. Partitionsspalten im Strukturstil sind Schlüssel-Wert-Paare, die durch ein Gleichheitszeichen kombiniert werden, z <base-path>/a=x/b=1/c=y/file.format. B. . In diesem Beispiel sind die Partitionsspalten a, bund c. Standardmäßig werden diese Spalten automatisch zu Ihrem Schema hinzugefügt, wenn Sie schemabasierte Ableitung verwenden und angeben, aus denen <base-path> Daten geladen werden sollen. Wenn Sie ein Schema angeben, erwartet auto Loader, dass diese Spalten im Schema enthalten sind. Wenn Sie diese Spalten nicht als Teil des Schemas verwenden möchten, können Sie angeben, dass "" diese Spalten ignoriert. Darüber hinaus können Sie diese Option verwenden, wenn Spalten aus dem Dateipfad in komplexen Verzeichnisstrukturen wie im untenstehenden Beispiel abgeleitet werden sollen.
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
Die Spezifizierung von cloudFiles.partitionColumns als year,month,day gibt year=2022 für file1.csv zurück, aber die Spalten month und day sind null.
month und day werden richtig analysiert für file2.csv und file3.csv.
cloudFiles.schemaEvolutionMode addNewColumns wenn kein Schema angegeben wird, none andernfalls addNewColumns, , nonerescuefailOnNewColumns Der Modus zum Weiterentwickeln des Schemas, wenn neue Spalten in den Daten ermittelt werden. Standardmäßig werden Spalten als Zeichenfolgen abgeleitet, wenn JSON-Datasets abgeleitet werden. Weitere Informationen finden Sie in der Schemaentwicklung .
cloudFiles.schemaHints None Eine Schemazeichenfolge Schemainformationen, die Sie während der Schemaausleitung für das automatische Laden angeben. Weitere Informationen finden Sie unter Schemahinweise .
cloudFiles.schemaLocation Keine (erforderlich, um das Schema abzuleiten) Eine Pfadzeichenfolge Der Speicherort, an dem das abgeleitete Schema und nachfolgende Änderungen gespeichert werden. Weitere Informationen finden Sie unter schemainference .
cloudFiles.useStrictGlobber false true, false Gibt an, ob ein strenger Globber verwendet werden soll, der dem Standard-Globbingverhalten anderer Dateiquellen in Apache Spark entspricht. Weitere Informationen finden Sie unter "Allgemeine Datenlademuster ". Verfügbar in Databricks Runtime 12.2 LTS und höher.
cloudFiles.validateOptions true true, false Gibt an, ob die Autoloader-Optionen überprüft werden und ob bei unbekannten oder inkonsistenten Optionen ein Fehler ausgegeben werden soll.

Verzeichnisauflistung

Die folgende Option gilt bei Verwendung des Verzeichnisauflistungsmodus.

Schlüssel Vorgabe Gültige Werte Description
cloudFiles.useIncrementalListing (veraltet) auto on Databricks Runtime 17.2 and below, false on Databricks Runtime 17.3 and above auto, truefalse Diese Funktion wurde eingestellt. Databricks empfiehlt die Verwendung des Dateibenachrichtigungsmodus mit Dateiereignissen anstelle von cloudFiles.useIncrementalListing.
Gibt an, ob im Verzeichnisauflistungsmodus anstelle der vollständigen Auflistung die inkrementelle Auflistung verwendet werden soll. Standardmäßig versucht der Autoloader, automatisch zu ermitteln, ob ein bestimmtes Verzeichnis für die inkrementelle Auflistung geeignet ist. Sie können explizit die inkrementelle Auflistung oder die vollständige Verzeichnisauflistung verwenden, indem Sie den Wert true bzw. false festlegen.
Eine nicht ordnungsgemäße Aktivierung der inkrementellen Auflistung in einem nicht lexikalisch sortierten Verzeichnis verhindert, dass der Autoloader neue Dateien erkennt.
Arbeitet mit Azure Data Lake Storage (abfss://), S3 (s3://) und GCS (gs://).
Verfügbar in Databricks Runtime 9.1 LTS und höheren Versionen.

Dateibenachrichtigung

Informationen zum Konfigurieren des Dateibenachrichtigungsmodus, einschließlich der erforderlichen Cloudberechtigungen, Setupanweisungen und Authentifizierungsmethoden, finden Sie unter Konfigurieren von Datenströmen für das automatische Laden im Dateibenachrichtigungsmodus.

Schlüssel Vorgabe Gültige Werte Description
cloudFiles.fetchParallelism 1 Positive Zahlen Anzahl der Threads, die beim Abrufen von Nachrichten aus dem Warteschlangendienst verwendet werden.
Nicht verwenden, wenn cloudFiles.useManagedFileEvents auf true eingestellt ist.
cloudFiles.pathRewrites None Eine JSON-Zuordnungszeichenfolge Erforderlich nur, wenn Sie eine queueUrl Dateibenachrichtigung von mehreren S3-Buckets empfangen und Bereitstellungspunkte verwenden möchten, die für den Zugriff auf Daten in diesen Containern konfiguriert sind. Verwenden Sie diese Option, um das Präfix des bucket/key-Pfads mit dem Bereitstellungspunkt umzuschreiben. Nur Präfixe können umgeschrieben werden. Zum Beispiel wird bei der Konfiguration {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"} der Pfad s3://<databricks-mounted-bucket>/path/2017/08/fileA.json in dbfs:/mnt/data-warehouse/2017/08/fileA.json umgeschrieben.
Nicht verwenden, wenn cloudFiles.useManagedFileEvents auf true eingestellt ist.
cloudFiles.resourceTag None Schlüsselwert-Tagzeichenfolgen Eine Reihe von Schlüssel-Wert-Tagpaaren zum Zuordnen und Identifizieren verwandter Ressourcen. Beispiel:
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
Nicht verwenden, wenn cloudFiles.useManagedFileEvents auf true eingestellt ist. Legen Sie stattdessen Ressourcentags mithilfe der Cloudanbieterkonsole fest.
Weitere Informationen finden Sie unter Ressourcentags für Cloudanbieter.
cloudFiles.useManagedFileEvents false true, false Wenn true festgelegt ist, verwendet Auto Loader den Dateiereignisdienst, um Dateien an Ihrem externen Standort zu entdecken. Sie können diese Option nur verwenden, wenn sich der Ladepfad an einem externen Speicherort mit aktivierten Dateivorgängen befindet. Siehe Verwenden des Dateibenachrichtigungsmodus mit Dateiereignissen.
Dateiereignisse bieten die Leistung auf Benachrichtigungsebene bei der Dateiermittlung, da das automatische Laden neue Dateien nach der letzten Ausführung ermitteln kann. Im Gegensatz zur Verzeichnisauflistung muss dieser Prozess nicht alle Dateien im Verzeichnis auflisten.
Es gibt einige Situationen, in denen der Auto Loader die Verzeichnisauflistung verwendet, obwohl die Option "Dateiereignisse" aktiviert ist.
  • Beim anfänglichen Laden, wenn includeExistingFiles auf true festgelegt ist, wird eine vollständige Verzeichnisauflistung durchgeführt, um alle Dateien zu ermitteln, die im Verzeichnis vorhanden waren, bevor das automatische Laden gestartet wurde.
  • Der Dateiereignisdienst optimiert die Ermittlung von Dateien, indem er die zuletzt erstellten Dateien zwischenspeichert. Wenn das automatische Laden selten ausgeführt wird, kann dieser Cache ablaufen, und das automatische Laden greift auf die Verzeichnisauflistung zurück, um Dateien zu ermitteln und den Cache zu aktualisieren. Um dieses Szenario zu vermeiden, rufen Sie das automatische Laden mindestens einmal alle sieben Tage auf.

Informationen dazu, wann das automatische Laden mit Dateiereignissen verzeichnisauflistung verwendet wird? Eine umfassende Liste der Situationen, in der auto Loader verzeichnisauflistung mit dieser Option verwendet.
Verfügbar in Databricks Runtime 14.3 LTS und höher.
cloudFiles.listOnStart false true, false Wenn dieser Wert auf true"Auto Loader" festgelegt ist, führt das automatische Laden beim Start des Datenstroms eine vollständige Verzeichnisauflistung aus, anstatt mit dem Fortsetzungstoken im Prüfpunkt zu beginnen. Verwenden Sie diese Option, um Fehler wie z CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. B. . Erfahren Sie , wie kann ich einen CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN Fehler wiederherstellen?.
cloudFiles.useNotifications false true, false Gibt an, ob mithilfe des Dateibenachrichtigungsmodus bestimmt werden soll, ob neue Dateien verfügbar sind. Bei einer Festlegung auf false wird der Verzeichnisauflistungsmodus verwendet. Siehe "Vergleichen Sie die Modi zur Dateierkennung beim automatischen Laden".
Nicht verwenden, wenn cloudFiles.useManagedFileEvents auf true eingestellt ist.
Cloudanbieter-Ressourcentags

Auto Loader fügt standardmäßig die folgenden Schlüssel-Wert-Tag-Paare auf Best-Effort-Basis hinzu:

  • vendor: Databricks
  • path: Der Speicherort, von dem aus die Daten geladen werden. In GCP aufgrund von Bezeichnungseinschränkungen nicht verfügbar.
  • checkpointLocation: Die Position des Stream-Checkpoints. In GCP aufgrund von Bezeichnungseinschränkungen nicht verfügbar.
  • streamId: Ein global eindeutiger Bezeichner für den Stream.

Databricks behält sich diese Schlüsselnamen vor, und Sie können deren Werte nicht überschreiben.

Weitere Informationen zu Azure finden Sie unter Benennen von Warteschlangen und Metadaten und die Berichterstattung über properties.labels in Ereignisabonnements. Der Autoloader speichert diese Schlüssel-Wert-Tagpaare in JSON als Bezeichnungen.

Cloudspezifisch

Auto Loader verfügt über Optionen zum Konfigurieren der Cloudinfrastruktur für den Dateibenachrichtigungsmodus. Erforderliche Cloudberechtigungen und Einrichtungsanweisungen finden Sie unter Konfigurieren von Datenströmen für das automatische Laden im Dateibenachrichtigungsmodus.

Azure

Sie müssen Werte für alle folgenden Optionen angeben, wenn Sie angeben cloudFiles.useNotifications = true und möchten, dass das automatische Laden die Benachrichtigungsdienste für Sie einrichtet:

Schlüssel Vorgabe Gültige Werte Description
cloudFiles.resourceGroup None Beliebige Zeichenfolge Die Azure Ressourcengruppe, in der das Speicherkonto erstellt wird.
cloudFiles.subscriptionId None Beliebige Zeichenfolge Die Azure Abonnement-ID, in der die Ressourcengruppe erstellt wird.
databricks.serviceCredential None Beliebige Zeichenfolge Der Name Ihrer Databricks-Dienstanmeldeinformationen. Verfügbar in Databricks Runtime 16.1 und höher.

Wenn keine Databricks-Dienstanmeldeinformationen verfügbar sind, können Sie stattdessen die folgenden Authentifizierungsoptionen angeben:

Schlüssel Vorgabe Gültige Werte Description
cloudFiles.clientId None Beliebige Zeichenfolge Die Client-ID oder Anwendungs-ID des Dienstprinzipals.
cloudFiles.clientSecret None Beliebige Zeichenfolge Das Kundengeheimnis des Dienstherrn.
cloudFiles.connectionString None Eine Verbindungszeichenfolge. Die Verbindungszeichenfolge für das Speicherkonto, basierend entweder auf dem Kontozugriffsschlüssel oder der gemeinsamen Zugriffssignatur (SAS).
cloudFiles.tenantId None Beliebige Zeichenfolge Die Azure Mandanten-ID, in der der Dienstprinzipal erstellt wird.

Geben Sie die folgende Option nur an, wenn Sie das automatische Laden festlegen cloudFiles.useNotifications = true und möchten, dass eine vorhandene Warteschlange verwendet wird:

Schlüssel Vorgabe Gültige Werte Description
cloudFiles.queueName None Beliebige Zeichenfolge Der Name der Azure-Warteschlange. Wenn angegeben, verwendet die Quelle für Clouddateien direkt Ereignisse aus dieser Warteschlange, anstatt eigene Azure Event Grid- und Warteschlangenspeicherdienste einzurichten. In diesem Fall benötigen databricks.serviceCredential und cloudFiles.connectionString nur Leseberechtigungen für die Warteschlange.

Delta Lake

Die folgenden Optionen gelten beim Lesen aus einer Delta Lake-Tabelle mit spark.readStream.

Schlüssel Vorgabe Gültige Werte Description
allowSourceColumnDrop None Eine Versionsnummer oder always Legen Sie die Versionsnummer einer Delta-Tabelle fest, oder always damit der Datenstrom fortgesetzt werden kann, nachdem Spalten aus dem Quelltabellenschema gelöscht wurden. Wenn sie auf eine Versionsnummer festgelegt ist, werden alle Schemaänderungen bis zu dieser Version bestätigt. Erfordert schemaTrackingLocation. Siehe Umbenennen und Löschen von Spalten mit der Delta Lake-Spaltenzuordnung.
allowSourceColumnRename None Eine Versionsnummer oder always Legen Sie die Versionsnummer einer Delta-Tabelle fest, oder always damit der Datenstrom fortgesetzt werden kann, nachdem Die Spalten in der Quelltabelle umbenannt wurden. Wenn sie auf eine Versionsnummer festgelegt ist, werden alle Schemaänderungen bis zu dieser Version bestätigt. Erfordert schemaTrackingLocation. Siehe Umbenennen und Löschen von Spalten mit der Delta Lake-Spaltenzuordnung.
allowSourceColumnTypeChange None Eine Versionsnummer oder always Legen Sie die Versionsnummer einer Delta-Tabelle fest, oder always damit der Datenstrom fortgesetzt werden kann, nachdem Spaltentypen in der Quelltabelle geändert wurden. Wenn sie auf eine Versionsnummer festgelegt ist, werden alle Schemaänderungen bis zu dieser Version bestätigt. Erfordert schemaTrackingLocation. Weitere Informationen finden Sie unter Typerweiterung.
excludeRegex None Eine Java regex-Zeichenfolge Ein Muster für reguläre Ausdrücke. Dateien, deren Pfade mit dem Muster übereinstimmen, werden vom Streaminglesevorgang ausgeschlossen. Nützlich zum Filtern von Dateien, die nicht der erwarteten Benennungskonvention entsprechen.
failOnDataLoss true true, false Gibt an, ob die Streamingabfrage fehlschlägt, wenn Quelldaten aufgrund der ProtokollaufbewahrunglogRetentionDuration () gelöscht wurden. Legen Sie fest, dass false fehlende Daten übersprungen und die Verarbeitung fortgesetzt werden soll. Siehe Konfigurieren der Datenaufbewahrung für Zeitreiseabfragen.
ignoreChanges (veraltet) false true, false Verfügbar in Databricks Runtime 11.3 LTS und niedriger. Sendet neu geschriebene Datendateien nach Änderungsvorgängen wie UPDATE, , MERGE INTO, DELETEoder OVERWRITE. Unveränderte Zeilen können zusammen mit neuen Zeilen ausgegeben werden, sodass nachgeschaltete Verbraucher Duplikate verarbeiten müssen. Löschungen werden nicht an nachgelagerte Systeme weitergegeben. Ersetzt durch skipChangeCommits databricks Runtime 12.2 LTS und höher.
ignoreDeletes (veraltet) false true, false Ignoriert Transaktionen, die Daten an Partitionsgrenzen löschen (nur vollständige Partitionen fallen ab). Behandelt keine Nichtpartitionslöschungen, Aktualisierungen oder andere Änderungen. Verwenden Sie stattdessen skipChangeCommits.
readChangeFeed oder readChangeData false true, false Gibt an, ob das Lesen des Änderungsdatenfeeds für die Streamingabfrage aktiviert werden soll. Wenn diese Option aktiviert ist, gibt der Datenstrom Änderungen auf Zeilenebene (Einfügungen, Aktualisierungen und Löschvorgänge) mit zusätzlichen Metadatenspalten aus. Weitere Informationen finden Sie unter Verwenden des Änderungsdatenfeeds auf Azure Databricks.
schemaTrackingLocation None Eine Pfadzeichenfolge Pfad zu einem Verzeichnis, in dem Delta Lake Schemaänderungen für das Streaminglese nachverfolgt. Erforderlich beim Streamen von Tabellen mit aktivierter Spaltenzuordnung und Verwendung von allowSourceColumn* Optionen zum Behandeln der Schemaentwicklung. Muss innerhalb checkpointLocation der Streamingabfrage sein. Siehe Umbenennen und Löschen von Spalten mit der Delta Lake-Spaltenzuordnung.
skipChangeCommits false true, false Ignoriert Transaktionen, die vorhandene Datensätze löschen oder ändern, und verarbeitet nur Anfüge. Databricks empfiehlt diese Option für die meisten Workloads, die keine Änderungsdatenfeeds verwenden. Verfügbar in Databricks Runtime 12.2 LTS und höher. Siehe Skip upstream change commits with skipChangeCommits.
startingTimestamp Neueste verfügbar Eine Zeitstempelzeichenfolge, z 2019-01-01T00:00:00.000Z . B. eine Datumszeichenfolge, z. B. 2019-01-01 Zeitstempel, um mit dem Lesen zu beginnen. Der Datenstrom liest alle Tabellenänderungen, die an oder nach dem angegebenen Zeitstempel zugesichert wurden. Wenn der Zeitstempel vor allen verfügbaren Tabellen-Commits steht, beginnt der Datenstrom mit dem frühesten verfügbaren Commit. Kann nicht zusammen mit startingVersion. Wird ignoriert, wenn der Streamingprüfpunkt bereits vorhanden ist.
startingVersion Neueste verfügbar Eine positive ganze Zahl, 0oder latest Delta-Tabellenversion, um mit dem Lesen zu beginnen. Der Datenstrom liest alle Änderungen, die an oder nach der angegebenen Version zugesichert wurden. Geben Sie latest an, dass sie nur mit den letzten Änderungen beginnen soll. Kann nicht zusammen mit startingTimestamp. Wird ignoriert, wenn der Streamingprüfpunkt bereits vorhanden ist. Siehe „Den Tabellenverlauf nutzen“.
withEventTimeOrder false true, false Teilt die Momentaufnahme der ersten Tabelle in Ereigniszeit-Buckets auf, um zu verhindern, dass Datensätze als späte Ereignisse falsch markiert und in zustandsbehafteten Abfragen mit Wasserzeichen verworfen werden. Kann nicht geändert werden, nachdem die anfängliche Momentaufnahmeverarbeitung begonnen hat, ohne den Prüfpunkt zu löschen. Verfügbar in Databricks Runtime 11.3 LTS und höher. Siehe Initialaufnahme des Prozesses, ohne Daten zu verlieren.

Kafka

Verwenden Sie diese Optionen entweder oder spark.readStream.format("kafka")spark.read.format("kafka"):

Schlüssel Vorgabe Gültige Werte Description
assign None Eine JSON-Zeichenfolge, z. B. {"topicA":[0,1],"topicB":[2,4]} Die zu verwendenden spezifischen Partitionen. Sie müssen genau eine der subscribeOptionen subscribePatternangeben assign .
failOnDataLoss true true, false Gibt an, ob die Abfrage fehlschlägt, wenn Daten verloren gegangen sind, z. B. aufgrund gelöschter Themen oder eines Offsetabzugs. Legen Sie fest, false dass fehlende Daten übersprungen werden, und fahren Sie fort.
Databricks schätzt konservative, ob Daten verloren gegangen sind. Dies kann jedoch zu falschen Alarmen führen.
fetchoffset.numretries 3 Positive ganze Zahlen oder 0 Die Anzahl der Wiederholungen beim Abrufen von Kafka-Offsets schlägt fehl.
fetchoffset.retryintervalms 1000 Positive ganze Zahlen oder 0 Das Intervall in Millisekunden zwischen Offsetabruf-Wiederholungen.
groupIdPrefix spark-kafka-source (Streaming), spark-kafka-relation (Batch) Beliebige Zeichenfolge Das angepasste Präfix, das für die automatisch generierte Kafka-Consumergruppen-ID verwendet werden soll. Wenn kafka.group.id dieser explizit festgelegt ist, ignoriert der Connector diese Option.
kafka.group.id None Beliebige Zeichenfolge Die Kafka-Consumergruppen-ID, die beim Lesen verwendet werden soll. Verwenden Sie vorsichtslos: Abfragen, die dieselbe Gruppen-ID gemeinsam nutzen, stören sich gegenseitig und lesen möglicherweise nur Teildaten. Dies kann auftreten, wenn gleichzeitige Batch- und Streamingworkloads ausgeführt werden, oder wenn Abfragen schnell neu gestartet werden. Wenn festgelegt, groupIdPrefix wird ignoriert. Um Probleme zu minimieren, legen Sie die Kafka-Consumerkonfiguration session.timeout.ms auf einen kleinen Wert fest.
includeHeaders false true, false Gibt an, ob Kafka-Nachrichtenkopfzeilen als Spalte in die Ausgabe eingeschlossen werden sollen.
kafkaconsumer.polltimeoutms None Positive Zahlen Das Timeout in Millisekunden für den Kafka-Verbraucheranruf poll() .
kafka.bootstrap.servers None Eine durch Trennzeichen getrennte Liste von host:port Zeichenfolgen Eine durch Trennzeichen getrennte Liste der Host:Port-Adressen für Kafka-Broker. Legt die Eigenschaft des bootstrap.servers Kafka-Clients fest.
Wenn Sie feststellen, dass keine Daten aus Kafka vorhanden sind, überprüfen Sie diese Brokeradressenliste auf falsche Adressen. Wenn die Brokeradressliste falsch ist, treten möglicherweise keine Fehler auf. Kafka-Clients gehen davon aus, dass die Broker irgendwann verfügbar sind, und versuchen Sie es für immer, wenn sie Netzwerkfehler erhalten.
maxRecordsPerPartition None Positive Zahlen Die maximale Anzahl von Datensätzen für jede Spark-Partition. Bei Festlegung teilt der Connector Kafka-Partitionen auf, sodass jede Spark-Partition höchstens diese vielen Datensätze liest.
Sie können diese Option auch mit minPartitions. Wenn beide Optionen festgelegt sind, verwendet Spark je nachdem, welche Option zu weiteren Partitionen führt.
minPartitions None Positive Zahlen Die Mindestanzahl der Spark-Partitionen, die aus Kafka gelesen werden sollen. Bei Festlegung teilt der Verbinder große Kafka-Partitionen auf, um die Parallelität zu erhöhen. Wenn nicht festgelegt, erstellt Spark eine Partition für jede Kafka-Themenpartition. Nützlich für die Behandlung von Datenverknen oder Spitzenlasten.
Mit dieser Option werden Kafka-Consumer für jeden Trigger neu initialisiert, was sich auf die Leistung mit SSL auswirken kann.
startingOffsets latest (Streaming), earliest (Batch) earliest, latestoder eine JSON-Offsetzeichenfolge Der Offset, von dem die Abfrage mit dem Lesen beginnt. In der JSON-Zeichenfolge -1 ist der neueste Offset angegeben. -2 ist der früheste Offset. Beispiel: {"topicA":{"0":23,"1":-2}}.
Bei Streamingabfragen gilt diese Option nur, wenn eine neue Abfrage gestartet wird. Fortgesetzte Abfragen verwenden immer den Prüfpunkt. Während einer Abfrage beginnen neue Partitionen mit dem frühesten Offset.
Für Batchabfragen latest ist dies nicht zulässig.
startingOffsetsByTimestamp None Eine JSON-Zeitstempelzeichenfolge, z. B. {"topicA":{"0":1000,"1":2000}} Eine Liste der Startoffsets für jede Partition, die als Zeitstempel in Millisekunden angegeben ist. Wenn kein Offset für einen Zeitstempel vorhanden ist, wird das Abfrageverhalten durch startingOffsetsByTimestampStrategybestimmt.
Bei Streamingabfragen gilt diese Option nur, wenn eine neue Abfrage gestartet wird. Fortgesetzte Abfragen verwenden immer den Prüfpunkt. Während einer Abfrage beginnen neue Partitionen mit dem frühesten Offset.
startingOffsetsByTimestampStrategy error error, latest Die Zu verwendende Strategie, wenn kein Offset für einen in startingOffsetsByTimestamp oder startingTimestamp. error löst eine Ausnahme aus. latest verwendet den neuesten verfügbaren Offset.
startingTimestamp None Positive ganze Zahlen oder 0 Der globale Startzeitstempel in Millisekunden, der für alle Partitionen gilt. Wenn kein Offset für den Zeitstempel vorhanden ist, wird das Verhalten von startingOffsetsByTimestampStrategy.
subscribe None Eine durch Trennzeichen getrennte Liste von Themennamen Die Themen, die abonniert werden sollen. Sie müssen genau eine der subscribeOptionen subscribePatternangeben assign .
subscribePattern None Eine Java regex-Zeichenfolge Das Muster, das zum Abonnieren von Themen verwendet wird. Sie müssen genau eine der subscribeOptionen subscribePatternangeben assign . Beispiel: topic.*

Die folgenden Optionen gelten nur für Streaminglesevorgänge mit spark.readStream.format("kafka"):

Schlüssel Vorgabe Gültige Werte Description
bytesEstimateWindowLength 300s Dauerzeichenfolgen wie 10m oder 600s Das Zeitfenster, das verwendet wird, um verbleibende Bytes für die estimatedTotalBytesBehindLatest Metrik zu schätzen. Siehe Abrufen von Kafka-Metriken.
maxOffsetsPerTrigger None Positive Zahlen Die maximale Anzahl von Offsets für den Prozess pro Triggerintervall. Offsets werden proportional auf Themenpartitionen verteilt.
maxTriggerDelay 15m Dauerzeichenfolgen wie 10m oder 600s Die maximale Zeit bis zum minOffsetsPerTrigger Ansammeln vor dem Auslösen.
minOffsetsPerTrigger None Positive Zahlen Die minimale Anzahl der Offsets, die gesammelt werden sollen, bevor ein Mikrobatch ausgelöst wird. Wenn maxTriggerDelay erreicht ist, wird der Mikrobatch unabhängig davon ausgeführt.

Offsetoptionen, die nur für Batchlesevorgänge spark.read.format("kafka")gelten, finden Sie unter DataFrameReader Kafka-Optionen.

Authentifizierung

Databricks empfiehlt die Verwendung einer Unity-Katalogdienstanmeldeinformationen zur Authentifizierung bei cloudverwalteten Kafka-Diensten (AWS MSK, Azure Event Hubs oder Google Cloud Managed Kafka).

Schlüssel Vorgabe Gültige Werte Description
databricks.serviceCredential None Beliebige Zeichenfolge Der Name einer Unity-Katalogdienstanmeldeinformationen für die Authentifizierung bei cloudverwalteten Kafka-Diensten. Verfügbar in Databricks Runtime 16.1 und höher.
databricks.serviceCredential.scope None Beliebige Zeichenfolge Der OAuth-Bereich für die Dienstanmeldeinformationen. Legen Sie diese Einstellung nur fest, wenn Azure Databricks den Umfang ihres Kafka-Diensts nicht automatisch ableiten kann.

Wenn keine Dienstanmeldeinformationen verfügbar sind, verwenden Sie SASL/SSL-Optionen (übergeben als kafka.* Eigenschaften). Wenn Sie eine Dienstanmeldeinformationen verwenden, müssen Sie weder angeben kafka.sasl.mechanismnoch kafka.sasl.jaas.configkafka.security.protocol.

Schlüssel Vorgabe Gültige Werte Description
kafka.security.protocol None Eine Sicherheitsprotokollzeichenfolge, zSASL_SSL. B. , SSLPLAINTEXT Das Sicherheitsprotokoll für die Brokerkommunikation.
kafka.sasl.mechanism None Eine SASL-Mechanismuszeichenfolge, zPLAIN. B. , SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARERAWS_MSK_IAM Der SASL-Mechanismus.
kafka.sasl.jaas.config None Eine JAAS-Konfigurationszeichenfolge Die JAAS-Anmeldekonfigurationszeichenfolge.
kafka.sasl.login.callback.handler.class None Ein vollqualifizierter Klassenname Der vollqualifizierte Klassenname eines Anmelderückrufhandlers für die SASL-Authentifizierung.
kafka.sasl.client.callback.handler.class None Ein vollqualifizierter Klassenname Der vollqualifizierte Klassenname eines Clientrückrufhandlers für die SASL-Authentifizierung.
kafka.ssl.truststore.location None Eine Dateipfadzeichenfolge Der Pfad zur SSL-Vertrauensspeicherdatei.
kafka.ssl.truststore.password None Beliebige Zeichenfolge Das Kennwort für die SSL-Vertrauensspeicherdatei.
kafka.ssl.keystore.location None Eine Dateipfadzeichenfolge Der Pfad zur SSL-Schlüsselspeicherdatei.
kafka.ssl.keystore.password None Beliebige Zeichenfolge Das Kennwort für die SSL-Schlüsselspeicherdatei.

Vollständige Anweisungen zum Einrichten der Authentifizierung finden Sie unter "Authentifizierung".

Pub/Sub

Verwenden Sie diese Optionen, spark.readStream.format("pubsub") um Google Pub/Sub zu abonnieren. Die Optionen subscriptionId, topicId, und projectId sind erforderlich.

Schlüssel Vorgabe Gültige Werte Description
subscriptionId None Beliebige Zeichenfolge Required. Die Pub/Sub-Abonnement-ID. Der Connector erstellt das Abonnement, wenn er nicht vorhanden ist.
topicId None Beliebige Zeichenfolge Required. Die Pub/Sub-Themen-ID.
projectId None Beliebige Zeichenfolge Required. Die Google Cloud-Projekt-ID.
numFetchPartitions Die Hälfte der bei der Streaminitialisierung verfügbaren Executoren Positive Zahlen Die Anzahl der parallelen Spark-Aufgaben, die Zeilen aus dem Abonnement abrufen.
maxBytesPerTrigger None Positive Zahlen Eine weiche Grenze für die Anzahl der Bytes, die pro Mikrobatch verarbeitet werden sollen.
maxRecordsPerFetch 1000 Positive Zahlen Die Anzahl der Zeilen, die pro Aufgabe vor der Verarbeitung abgerufen werden sollen.
maxFetchPeriod 10s Eine Dauerzeichenfolge, z 1s . B. oder 1m Die Dauer, die jeder Aufgabe zum Abrufen vor der Verarbeitung von Zeilen zur Verfügung steht. Azure Databricks empfiehlt die Verwendung des Standardwerts.
deleteSubscriptionOnStreamStop false true, false When true, the subscription, from subscriptionId, is deleted when the streaming query ends.
serviceCredential None Beliebige Zeichenfolge Der Name einer Azure Databricks Dienstanmeldeinformationen für die Authentifizierung bei Pub/Sub. Verfügbar in Databricks Runtime 16.1 und höher.
clientEmail None Eine E-Mail-Adresszeichenfolge Die E-Mail-Adresse des Google Service-Kontos. Erforderlich, wenn keine Dienstanmeldeinformationen verwendet werden.
clientId None Beliebige Zeichenfolge Die Client-ID des Google-Dienstkontos. Erforderlich, wenn keine Dienstanmeldeinformationen verwendet werden.
privateKey None Eine private Schlüsselzeichenfolge Der private Schlüssel für das Google Service-Konto. Erforderlich, wenn keine Dienstanmeldeinformationen verwendet werden.
privateKeyId None Beliebige Zeichenfolge Die private Schlüssel-ID für das Google Service-Konto. Erforderlich, wenn keine Dienstanmeldeinformationen verwendet werden.

Weitere Informationen zu Pub/Sub finden Sie unter "Google Pub/Sub abonnieren".

Pulsar

Verwenden Sie diese Optionen zum spark.readStream.format("pulsar") Streamen von Apache Pulsar. Verfügbar in Databricks Runtime 14.1 und höher.

Die folgenden Optionen sind erforderlich. Sie müssen genau einen von topic, topics, oder topicsPattern.

Schlüssel Vorgabe Gültige Werte Description
service.url None Eine Pulsar-Dienst-URL-Zeichenfolge Der Pulsar serviceURL für den Pulsar-Dienst zum Beispiel pulsar://broker.example.com:6650.
topic None Beliebige Zeichenfolge Ein einzelner Themenname, der verwendet werden soll.
topics None Eine durch Trennzeichen getrennte Liste von Themennamen Eine durch Trennzeichen getrennte Liste der zu verwendenden Themennamen.
topicsPattern None Eine Java regex-Zeichenfolge Eine Java regex-Zeichenfolge, die den Themennamen entspricht.

Die folgenden Optionen werden ebenfalls unterstützt:

Schlüssel Vorgabe Gültige Werte Description
admin.url None Eine URL-Zeichenfolge Die HTTP-URL des Pulsar-Verwaltungsdiensts. Erforderlich, wenn maxBytesPerTrigger festgelegt wird.
allowDifferentTopicSchemas false true, false Wenn mehrere Themen mit unterschiedlichen Schemas gelesen werden, verwenden Sie diese Option, um die automatische schemabasierte Themenwert-Deserialisierung zu deaktivieren. Wenn dies true ist, werden nur die Rohwerte zurückgegeben.
failOnDataLoss true true, false Gibt an, ob die Abfrage fehlschlägt, wenn Daten verloren gehen. Beispielsweise kann es zu Datenverlust kommen, wenn Themen gelöscht oder Nachrichten aufgrund einer Aufbewahrungsrichtlinie ablaufen.
maxBytesPerTrigger None Positive Zahlen Eine weiche Grenze für die Anzahl der Bytes, die pro Mikrobatch verarbeitet werden sollen. Erfordert admin.url.
pollTimeoutMs 120000 Positive Zahlen Das Timeout für das Lesen von Nachrichten aus Pulsar in Millisekunden.
predefinedSubscription None Beliebige Zeichenfolge Der vordefinierte Abonnementname, der vom Connector zum Nachverfolgen des Spark-Anwendungsfortschritts verwendet wird.
startingOffsets latest latest, earliestoder eine JSON-Offsetzeichenfolge Wo sie mit dem Lesen beginnen.
subscriptionPrefix None Beliebige Zeichenfolge Das Präfix, das vom Connector zum Generieren eines zufälligen Abonnements zum Nachverfolgen des Fortschritts der Spark-Anwendung verwendet wird.
waitingForNonExistedTopic false true, false Gibt an, ob der Connector wartet, bis die gewünschten Themen erstellt werden.

Sie können zusätzliche Pulsar-Client-, Administrator- und Lesekonfigurationen mithilfe der folgenden Optionsmuster angeben:

Muster Konfigurationsoptionen
pulsar.admin.* Pulsar-Administratorkonfiguration
pulsar.client.* Pulsar Client-Konfiguration, einschließlich Authentifizierungsoptionen wie pulsar.client.authPluginClassName und pulsar.client.authParams.
pulsar.reader.* Pulsar-Leserkonfiguration

Weitere Informationen zu Pulsar Client- und Administratorauthentifizierungsoptionen finden Sie unter Authentifizierung.

Authentifizierung

Azure Databricks unterstützt die Truststore- und Keystore-Authentifizierung bei Pulsar. Azure Databricks empfiehlt die Verwendung geheimer Schlüssel zum Speichern von Authentifizierungsdetails. Siehe Verwaltung von Geheimnissen.

Schlüssel Vorgabe Gültige Werte Description
pulsar.client.authPluginClassName None Ein vollqualifizierter Klassenname Der vollqualifizierte Klassenname des Authentifizierungs-Plug-Ins. Beispiel: org.apache.pulsar.client.impl.auth.AuthenticationTls
pulsar.client.authParams None Eine Zeichenfolge für Anmeldeinformationen Authentifizierungsanmeldeinformationen, die als Zeichenfolge an das Authentifizierungs-Plug-In übergeben werden. Beispiel: tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem
pulsar.client.useKeyStoreTls false true, false Wenn true, aktiviert KeyStore-basierte TLS-Konfiguration anstelle von PEM-Formatdateien.
pulsar.client.tlsTrustStoreType None Beliebige Zeichenfolge Das Format der TLS-Vertrauensspeicherdatei. Beispiel: JKS
pulsar.client.tlsTrustStorePath None Eine Dateipfadzeichenfolge Der Pfad zur TLS-Vertrauensspeicherdatei, die vertrauenswürdige Zertifizierungsstellenzertifikate enthält. Erforderlich, wenn pulsar.client.useKeyStoreTls gleich true ist.
pulsar.client.tlsTrustStorePassword None Beliebige Zeichenfolge Das Kennwort für die TLS-Vertrauensspeicherdatei.

Wenn der Datenstrom eine PulsarAdminverwendet, können Sie auch die folgenden Optionen festlegen:

Schlüssel Vorgabe Gültige Werte Description
pulsar.admin.authPluginClassName None Ein vollqualifizierter Klassenname Der vollqualifizierte Klassenname des Authentifizierungs-Plug-Ins für den Pulsar Admin Client.
pulsar.admin.authParams None Eine Zeichenfolge für Anmeldeinformationen Authentifizierungsanmeldeinformationen für das Pulsar Admin Client-Authentifizierungs-Plug-In.
pulsar.admin.useTls None true, false Ob TLS für die Pulsar Admin-Clientverbindung verwendet werden soll.
pulsar.admin.tlsAllowInsecureConnection None true, false Ob unsichere TLS-Verbindungen für den Pulsar Admin-Client zugelassen werden sollen.
pulsar.admin.tlsTrustCertsFilePath None Eine Dateipfadzeichenfolge Pfad zur vertrauenswürdigen TLS-Zertifikatdatei für den Pulsar Admin Client.
pulsar.admin.useKeyStoreTls None true, false Ob KeyStore-basiertes TLS für den Pulsar Admin Client verwendet werden soll.
pulsar.admin.tlsTrustStoreType None Beliebige Zeichenfolge Das Format des TLS Trust Stores für den Pulsar Admin Client. Beispiel: JKS
pulsar.admin.tlsTrustStorePath None Eine Dateipfadzeichenfolge Pfad zur TLS Trust Store Datei für den Pulsar Admin Client. Erforderlich, wenn pulsar.admin.useKeyStoreTls gleich true ist.
pulsar.admin.tlsTrustStorePassword None Beliebige Zeichenfolge Kennwort für den Pulsar Admin-Client TLS Trust Store.

Authentifizierungsbeispiele finden Sie unter Authenticate to Pulsar.

DataFrameWriter-Optionen

Verwenden Sie diese Optionen mit DataFrameWriter.option() und DataFrameWriterV2.option(), um zu steuern, wie Azure Databricks Daten schreibt.

Example

Im folgenden Beispiel wird festgelegt mergeSchema , dass True eine Delta Lake-Tabelle geschrieben wird:

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

Avro

Die folgenden Optionen gelten beim Schreiben von Avro-Dateien.

Schlüssel Vorgabe Gültige Werte Description
avroSchema None Eine JSON-Schemazeichenfolge Das vollständige Avro-Schema als JSON-Zeichenfolge. Verwenden Sie diese Option, um Spark SQL-Typen in bestimmte Avro-Typen zu konvertieren. Gilt für Avro-Dateien lesen und schreiben.
avroSchemaUrl None Eine URL-Zeichenfolge Eine URL, die auf eine Avro-Schemadatei verweist. Verwenden Sie anstelle der avroSchema externen Speicherung des Schemas. Gegenseitiger Ausschluss mit avroSchema Gilt für Avro-Dateien lesen und schreiben.
compression snappy uncompressed, deflate, snappy (default), bzip2, , xz, zstandard Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für Avro-Dateien lesen und schreiben.
recordName topLevelRecord Beliebige Zeichenfolge Der Name des Datensatzes der obersten Ebene im Avro-Ausgabeschema. Gilt für Avro-Dateien lesen und schreiben.
positionalFieldMatching false true, false Gibt an, ob Spalten zwischen dem Spark-Schema und dem Avro-Schema anhand der Feldposition anstelle des Namens übereinstimmen sollen. Gilt für Avro-Dateien lesen und schreiben.
recordNamespace Leere Zeichenfolge Beliebige Zeichenfolge Der Namespace für den Datensatz der obersten Ebene im Ausgabe-Avro-Schema. Gilt für Avro-Dateien lesen und schreiben.

Delta Lake und Apache Iceberg

Die folgenden Optionen gelten beim Schreiben von Delta Lake- und Apache Iceberg-Tabellen.

Schlüssel Vorgabe Gültige Werte Description
clusterByAuto false true, false Gibt an, ob die automatische Flüssigclusterung aktiviert werden soll, wobei Azure Databricks gruppierte Spalten basierend auf Abfragemustern auswählt. Nur gültig mit mode("overwrite"). Kann nicht im append Modus verwendet werden. Verfügbar in Databricks Runtime 16.4 und höher. Gilt für die Verwendung von Flüssigclustering für Tabellen.
mergeSchema None true, false Gibt an, ob die Schemaentwicklung für den Schreibvorgang aktiviert werden soll. Dem Zieltabellenschema werden neue Spalten im Quelldatenframe hinzugefügt. Gilt für Anfüge- und Batch-Streaming.Applies to batch and streaming appends. Gilt für das Update-Tabellenschema.
overwriteSchema None true, false Gibt an, ob das Tabellenschema und die Partitionierung beim Überschreiben ersetzt werden sollen. Erfordert mode("overwrite") ohne replaceWhere. Kann nicht mit partitionOverwriteMode verwendet werden. Gilt für das Update-Tabellenschema.
partitionOverwriteMode None static, dynamic Der Partitionsüberschreibmodus. Legen Sie diese Einstellung fest, dynamic um nur Partitionen zu überschreiben, die neue Daten enthalten, sodass alle anderen Partitionen unverändert bleiben. Legacymodus, nicht unterstützt auf serverlosen Compute oder Databricks SQL. Gilt für selektives Überschreiben von Daten mit Delta Lake.
replaceOn None Eine boolesche Ausdruckszeichenfolge Ein boolescher Ausdruck, der Zeilen in der Zieltabelle entspricht, die durch Zeilen aus der Quellabfrage ersetzt werden sollen. Kann sowohl in der Zieltabelle als auch in der Quellabfrage auf Spalten verweisen. Zeilen im Ziel, die einer Quellzeile entsprechen, werden gelöscht und ersetzt. Wenn die Quelle leer ist, treten keine Löschungen auf. Wird verwendet targetAlias , um mehrdeutigen Spaltenverweise zu unterscheiden. Verfügbar in Databricks Runtime 17.1 und höher. Gilt für selektives Überschreiben von Daten mit Delta Lake.
replaceUsing None Eine durch Trennzeichen getrennte Liste von Spaltennamen Eine durch Trennzeichen getrennte Liste von Spaltennamen, die verwendet werden, um Zeilen zwischen der Zieltabelle und der Quellabfrage abzugleichen. Sowohl das Ziel als auch die Quelle müssen alle aufgelisteten Spalten enthalten. Zeilen im Ziel, die einer Quellzeile unter Gleichheitsvergleich entsprechen, werden gelöscht und ersetzt. NULL Werte werden nicht gleich behandelt und stimmen nicht überein. Verfügbar in Databricks Runtime 16.3 und höher. Gilt für selektives Überschreiben von Daten mit Delta Lake.
replaceWhere None Eine Prädikatausdruckszeichenfolge Ein Prädikatausdruck. Atomar überschreibt nur die Datensätze, die dem Prädikat entsprechen. Gilt für selektives Überschreiben von Daten mit Delta Lake.
targetAlias None Beliebige Zeichenfolge Ein Zeichenfolgenalias für die Zieltabelle. Wird verwendet, um replaceOn Spaltenverweise zu unterscheiden, replaceWhere wenn die Bedingung Spalten aus der Zieltabelle und der Quellabfrage referenziert. Gilt für selektives Überschreiben von Daten mit Delta Lake.
txnAppId None Beliebige Zeichenfolge Eine eindeutige Zeichenfolge, die die Anwendung für idempotent-Schreibvorgänge in foreachBatch Vorgänge identifiziert. Verwenden Sie diese Verwendung zusammen, txnVersion um sicherzustellen, dass in mehrere Delta Lake-Tabellen genau einmal geschrieben wird. Gilt für foreachBatch Schreibvorgänge der idempotenten Tabelle.
txnVersion None Eine monoton zunehmende ganze Zahl Eine monoton steigende Zahl, die als Transaktionsversion für idempotente Schreibvorgänge in foreachBatch Vorgängen verwendet wird. Verwenden Sie diese Verwendung zusammen, txnAppId um sicherzustellen, dass in mehrere Delta Lake-Tabellen genau einmal geschrieben wird. Gilt für foreachBatch Schreibvorgänge der idempotenten Tabelle.
optimizeWrite None true, false Gibt an, ob das automatische Optimieren des Schreibens für diesen Schreibvorgang aktiviert werden soll. Setzt die spark.databricks.delta.optimizeWrite.enabled Konfiguration außer Kraft. Gilt für What is Delta Lake in Azure Databricks?.
userMetadata None Beliebige Zeichenfolge Eine benutzerdefinierte Zeichenfolge, die an die Commitmetadaten für den Schreibvorgang angefügt wurde. Sichtbar in der Ausgabe von DESCRIBE HISTORY. Gilt für Anreichern von Tabellen mit benutzerdefinierten Metadaten.

CSV

Die folgenden Optionen gelten beim Schreiben von CSV-Dateien.

Schlüssel Vorgabe Gültige Werte Description
charToEscapeQuoteEscaping \0 (nicht aktiviert) Ein einzelnes Zeichen Das Zeichen, das zum Escapezeichen verwendet wird, wenn es sich von dem Anführungszeichen unterscheidet. Gilt für CSV (DataFrameWriter).
compression none none (default), bzip2, gzip, lz4, snappy, , deflate, zstd Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für CSV (DataFrameWriter).
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Formatzeichenfolge für Datumsspaltenwerte. Gilt für CSV (DataFrameWriter).
emptyValue Leere Zeichenfolge Beliebige Zeichenfolge Die Zeichenfolge, die für leere Werte (nicht null) geschrieben wurde. Gilt für CSV (DataFrameWriter).
encoding UTF-8 Ein java.nio.charset.Charset Name Die Zeichencodierung für die Ausgabedateien. Gilt für CSV (DataFrameWriter).
escape \ Ein einzelnes Zeichen Das Zeichen, das verwendet wird, um an zitierte Werte zu escapen. Gilt für CSV (DataFrameWriter).
escapeQuotes true true, false Gibt an, ob Anführungszeichen in Anführungszeichen in Anführungszeichen gesetzt werden sollen. Gilt für CSV (DataFrameWriter).
header false true, false Gibt an, ob Spaltennamen als erste Zeile der Ausgabe geschrieben werden sollen. Gilt für CSV (DataFrameWriter).
ignoreLeadingWhiteSpace false true, false Gibt an, ob führende Leerzeichen beim Schreiben von Werten gekürzt werden sollen. Gilt für CSV (DataFrameWriter).
ignoreTrailingWhiteSpace false true, false Gibt an, ob nachgestellte Leerzeichen beim Schreiben von Werten gekürzt werden sollen. Gilt für CSV (DataFrameWriter).
lineSep \n Eine Zeichenfolge Die zeilentrennzeichenzeichenfolge, die zwischen Datensätzen verwendet wird. Gilt für CSV (DataFrameWriter).
locale en-US Ein java.util.Locale Bezeichner Ein java.util.Locale-Bezeichner. Ein Java Gebietsschema identifiziert, das sich auf das Standarddatum, den Zeitstempel und die Dezimalanalyse innerhalb der CSV auswirkt.
nullValue Leere Zeichenfolge Beliebige Zeichenfolge Zeichenfolge, die für Nullwerte geschrieben wurde. Gilt für CSV (DataFrameWriter).
quote " Ein einzelnes Zeichen Das Zeichen, das zum Anführungszeichen von Feldwerten verwendet wird, die das Trennzeichen enthalten. Gilt für CSV (DataFrameWriter).
quoteAll false true, false Gibt an, ob alle Feldwerte unabhängig vom Inhalt in Anführungszeichen eingeschlossen werden sollen. Gilt für CSV (DataFrameWriter).
sep , Eine Zeichenfolge Das Feldtrennzeichen. Gilt für CSV (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Eine Zeitstempelformatzeichenfolge Die Formatzeichenfolge für Zeitstempelspaltenwerte. Gilt für CSV (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Formatzeichenfolge für Zeitstempel ohne Zeitzonenspaltenwerte.TimestampNTZType

Excel

Die folgenden Optionen gelten beim Schreiben Excel Dateien.

Schlüssel Vorgabe Gültige Werte Description
dataAddress None Ein Blattname oder eine Zellbezugszeichenfolge Der Blattname oder die Startzelle für den Schreibvorgang. Wenn dieser Wert nicht angegeben wird, wird in ein Blatt geschrieben, das bei Zelle Sheet1beginntA1. Akzeptiert einen Blattnamen (SheetName) oder einen einzelnen Zellbezug (SheetName!A1). Zellbereiche werden für Schreibvorgänge nicht unterstützt.
dateFormatInWrite yyyy-mm-dd Eine Excel Datumsformatzeichenfolge Excel Zellenformatzeichenfolge, die auf Date Spalten angewendet wird. Verwendet Excel Formatsyntax.
headerRows 0 0, 1 Gibt an, ob Spaltennamen als erste Zeile geschrieben werden sollen.
timestampNTZFormat yyyy-mm-dd hh:mm:ss Eine Excel Zeitstempelformatzeichenfolge Excel Zellenformatzeichenfolge, die auf TimestampNTZ und Timestamp Spalten angewendet wird. Verwendet Excel Formatsyntax.
version xlsx xlsx, xls Die Excel zu schreibenden Dateiformatversion.

JSON

Die folgenden Optionen gelten beim Schreiben von JSON-Dateien.

Schlüssel Vorgabe Gültige Werte Description
compression none none, bzip2, gzip, lz4, snappy, , deflate, zstd Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für json (DataFrameWriter).
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Formatzeichenfolge für Datumsspaltenwerte. Gilt für json (DataFrameWriter).
encoding UTF-8 Ein java.nio.charset.Charset Name Die Zeichencodierung für die Ausgabedateien. Gilt für json (DataFrameWriter).
ignoreNullFields Wert von spark.sql.jsonGenerator.ignoreNullFields true, false Gibt an, ob Felder mit NULL-Werten aus der JSON-Ausgabe weggelassen werden sollen. Gilt für json (DataFrameWriter).
lineSep \n Eine Zeichenfolge Die zeilentrennzeichenzeichenfolge, die zwischen Datensätzen verwendet wird. Gilt für json (DataFrameWriter).
locale en-US Ein java.util.Locale Bezeichner Ein Java Gebietsschemabezeichner, der sich auf das Standarddatum, den Zeitstempel und die Dezimalanalyse innerhalb des JSON-Codes auswirkt.
pretty false true, false Gibt an, ob die JSON-Ausgabe ziemlich (eingezogen, multiline) aktiviert werden soll.
sortKeys false true, false Gibt an, ob die Schlüssel von JSON-Objekten alphabetisch in der Ausgabe sortiert werden sollen. Nützlich für die Herstellung von deterministischen Ausgaben.
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Eine Zeitstempelformatzeichenfolge Die Formatzeichenfolge für Zeitstempelspaltenwerte. Gilt für json (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Formatzeichenfolge für Zeitstempel ohne Zeitzonenspaltenwerte.TimestampNTZType
writeNonAsciiCharacterAsCodePoint false true, false Gibt an, ob Nicht-ASCII-Zeichen als \uXXXX Unicode-Escapesequenzen anstelle von literalen UTF-8-Zeichen in der Ausgabe codiert werden sollen.

ORC

Die folgenden Optionen gelten beim Schreiben von ORC-Dateien.

Schlüssel Vorgabe Gültige Werte Description
compression zstd none, uncompressed, snappy, zlib, , lzo, zstd, lz4, brotli Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für Orc (DataFrameWriter).

Parkett

Die folgenden Optionen gelten beim Schreiben von Parkettdateien.

Schlüssel Vorgabe Gültige Werte Description
compression snappy none, uncompressed, snappygzip, lzo, brotli, lz4, lz4_rawzstd Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für Parkett (DataFrameWriter).
spark.sql.parquet.outputTimestampType INT96 INT96, TIMESTAMP_MICROSTIMESTAMP_MILLIS Der physische Typ, der zum Codieren von Zeitstempelspalten verwendet wird. Wird INT96 zur Kompatibilität mit älteren Parkettlesern verwendet, die die Standardzeitstempeltypen nicht unterstützen.

Text

Die folgenden Optionen gelten beim Schreiben von Textdateien.

Schlüssel Vorgabe Gültige Werte Description
compression none none, bzip2, gzip, lz4, snappy, , deflate, zstd Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für Text (DataFrameWriter).
encoding UTF-8 Ein java.nio.charset.Charset Name Die Zeichencodierung für die Ausgabedateien.
lineSep \n Eine Zeichenfolge Die zeilentrennzeichenzeichenfolge, die zwischen Datensätzen verwendet wird. Gilt für Text (DataFrameWriter).

XML

Die folgenden Optionen gelten beim Schreiben von XML-Dateien.

Schlüssel Vorgabe Gültige Werte Description
arrayElementName item Beliebige Zeichenfolge Der Elementname für Arrayelemente ohne expliziten Namen. Gilt für XML (DataFrameWriter).
attributePrefix _ Beliebige Zeichenfolge Das Präfix, das Feldnamen vorangestellt ist, die XML-Attributen entsprechen. Gilt für XML (DataFrameWriter).
compression none none, bzip2, gzip, lz4, snappy, , deflate, zstd Beim Schreiben zu verwendenden Komprimierungscodecs. Gilt für XML (DataFrameWriter).
dateFormat yyyy-MM-dd Eine Datumsformatzeichenfolge Formatzeichenfolge für Datumsspaltenwerte. Gilt für XML (DataFrameWriter).
declaration version="1.0" encoding="UTF-8" standalone="yes" Eine XML-Deklarationszeichenfolge oder leere Zeichenfolge, die unterdrückt werden soll Die XML-Deklarationszeichenfolge, die oben in jeder Ausgabedatei geschrieben wurde. Legen Sie diesen Wert auf eine leere Zeichenfolge fest, um die Deklaration zu unterdrücken. Gilt für XML (DataFrameWriter).
encoding UTF-8 Ein java.nio.charset.Charset Name Die Zeichencodierung für die Ausgabedateien. Gilt für XML (DataFrameWriter).
indent 4 Leerzeichen Beliebige Zeichenfolge Die Zeichenfolge, die zum Einrücken untergeordneter Elemente in der Ausgabe verwendet wird. Legen Sie diesen Wert auf eine leere Zeichenfolge fest, um den Einzug zu deaktivieren und jede Zeile in einer einzelnen Zeile zu schreiben.
locale en-US Ein java.util.Locale Bezeichner Ein Java Gebietsschemabezeichner, der sich auf die Standardformatierung von Datum, Zeitstempel und Dezimalformatierung innerhalb des XML-Codes auswirkt.
nullValue null Beliebige Zeichenfolge Die Zeichenfolge, die für Nullwerte geschrieben wurde. Bei Festlegung auf null, Attribute und untergeordnete Elemente für NULL-Felder werden weggelassen. Gilt für XML (DataFrameWriter).
rootTag ROWS Beliebige Zeichenfolge Das Stammelementtag, das alle Zeilenelemente in der Ausgabe umschließt. Gilt für XML (DataFrameWriter).
rowTag ROW Beliebige Zeichenfolge Das Elementtag, das eine Zeile in der Ausgabe darstellt. Gilt für XML (DataFrameWriter).
singleVariantColumn None Eine Spaltennamenzeichenfolge Der Name der einzelnen Variant-Spalte, die in XML-Dateien geschrieben werden soll. Gilt für XML (DataFrameWriter).
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] Eine Zeitstempelformatzeichenfolge Die Formatzeichenfolge für Zeitstempelspaltenwerte. Gilt für XML (DataFrameWriter).
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] Eine Zeitstempelformatzeichenfolge Formatzeichenfolge für Zeitstempel ohne Zeitzonenspaltenwerte. Gilt für XML (DataFrameWriter).
validateName true true, false Gibt an, ob eine Ausnahme ausgelöst werden soll, wenn ein Spaltenname kein gültiger XML-Elementbezeichner ist. Gilt für XML (DataFrameWriter).
valueTag _VALUE Beliebige Zeichenfolge Der Feldname, der für Zeichendaten in XML-Elementen verwendet wird, die ebenfalls Attribute oder untergeordnete Elemente aufweisen. Gilt für XML (DataFrameWriter).

DataStreamWriter-Optionen

Verwenden Sie diese Optionen zum DataStreamWriter.option() Konfigurieren von Streaming-Schreibvorgängen.

Example

Im folgenden Beispiel wird die Prüfpunktposition für einen Datenstrom festgelegt:

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

Gemeinsam

Die folgenden Optionen gelten für alle Streaming-Schreibvorgänge.

Schlüssel Vorgabe Gültige Werte Description
checkpointLocation Keine (erforderlich) Eine Pfadzeichenfolge Pfad zum Prüfpunktverzeichnis für die Streamingabfrage. Erforderlich für Fehlertoleranz und genau einmal verarbeitete Garantien. Jede Streamingabfrage muss einen eindeutigen Prüfpunktspeicherort verwenden. Databricks empfiehlt, Prüfpunkte in einem Unity-Katalogvolume oder Cloudspeicherpfad zu speichern. Siehe Prüfpunkte für strukturiertes Streaming.
path None Eine Pfadzeichenfolge Ausgabepfad für dateibasierte Streaming-Senken wie Parkett. Gilt nur für dateiformatbasierte Formate.

Konsolenspüle

Die folgenden Optionen gelten beim Schreiben von Datenströmen in die Konsolenspüle.

Schlüssel Vorgabe Gültige Werte Description
numRows 20 Positive Zahlen Die Anzahl der Zeilen, die beim Schreiben in die Konsolenspüle für jeden Mikrobatch angezeigt werden sollen.
truncate true true, false Gibt an, ob lange Zeichenfolgen beim Anzeigen von Zeilen abgeschnitten werden sollen. Legen Sie fest, dass false vollständige Zeichenfolgenwerte angezeigt werden.

Delta Lake

Die folgenden Optionen gelten beim Schreiben eines Datenstroms in eine Delta Lake-Tabelle mit format("delta"). Überschreiboptionen wie overwriteSchema, replaceWhereund partitionOverwriteMode werden für Streaming-Schreibvorgänge nicht unterstützt.

Schlüssel Vorgabe Gültige Werte Description
mergeSchema false true, false Gibt an, ob das Delta Lake-Tabellenschema weiterentwickelt werden soll, wenn das Streaming DataFrame neue Spalten enthält. Gilt nur für das Anfügen des Ausgabemodus. Gilt für das Update-Tabellenschema.
userMetadata None Beliebige Zeichenfolge Eine benutzerdefinierte Zeichenfolge, die an die Commitmetadaten für den Schreibvorgang angefügt wurde. Sichtbar in der Ausgabe von DESCRIBE HISTORY. Gilt für Anreichern von Tabellen mit benutzerdefinierten Metadaten.

Dateispüle

Die folgende Option gilt beim Schreiben eines Datenstroms in dateibasierte Formate (Parkett, JSON, CSV, ORC, Text). Formatspezifische Optionen finden Sie unter DataFrameWriter-Optionen.

Schlüssel Vorgabe Gültige Werte Description
retention None Eine Zeitzeichenfolge, z 7 days . B. oder 24 hours Wie lange Senke-Metadatendateien aufbewahrt werden, die für Fehlertoleranz und Komprimierung verwendet werden. Wenn sie nicht festgelegt ist, werden Metadatendateien unbegrenzt aufbewahrt.

Kafka Spüle

Die folgenden Optionen gelten beim Schreiben in Kafka.

Schlüssel Vorgabe Gültige Werte Description
kafka.bootstrap.servers None Eine durch Trennzeichen getrennte Liste von host:port Zeichenfolgen Required. Eine durch Trennzeichen getrennte Liste der Kafka-Brokeradressen host:port .
topic None Beliebige Zeichenfolge Das Zielthema "Kafka" für alle Zeilen. Erforderlich, wenn der DataFrame keine Spalte enthält topic .
kafka.* None Beliebiger Konfigurationswert des Kafka-Produzenten Jede Kafka-Produzentenkonfiguration mit kafka.dem Präfix . Beispiel: kafka.compression.type

Speichersenke

Die folgenden Optionen gelten beim Schreiben von Datenströmen in die Speichersenke.

Schlüssel Vorgabe Gültige Werte Description
queryName Keine (erforderlich) Beliebige Zeichenfolge Der Name der In-Memory-Tabelle, in die die Abfrage schreibt. Erforderlich für die Speichersenke. Auch konfigurierbar über .queryName().
mode exactlyonce exactlyonce, atleastonce Liefergarantie für die Speichersenke. exactlyonce verwendet den Mikrobatchmodus mit genau einmaler Semantik. atleastonce verwendet den fortlaufenden Modus mit mindestens einmal semantischer Semantik.

Spark-Funktionsoptionen

Einige integrierte Spark SQL-Funktionen akzeptieren eine options Zuordnung, die das Analyse- oder Serialisierungsverhalten steuert. Übergeben Sie Optionen als Python dict oder scala Map[String, String].

Example

Im folgenden Beispiel wird eine JSON-Spalte analysiert, während falsch formatierte Datensätze abgelegt werden:

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

Avro

Avro-Funktionen akzeptieren dieselben Optionen wie die entsprechenden DataFrame-Optionen:

Example

Im folgenden Beispiel wird eine Avro-Spalte mit aktivierter Schemaentwicklung decodiert:

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

Darüber hinaus akzeptieren die Schemaregistrierungsvarianten die from_avroto_avro folgenden Optionen:

Schlüssel Vorgabe Gültige Werte Description
schemaId None Eine Ganze Zahl der Schema-ID Schema-ID aus der Confluent-Schemaregistrierung, die beim Decodieren von Avro-Daten verwendet werden soll, die mit einem Schema inkompatibel jsonFormatSchemasind. Gilt nur für from_avro .
confluent.schema.registry.* None Any Confluent SR client property value Confluent Schema Registry-Clientkonfigurationseigenschaften. Übergeben Sie eine beliebige Confluent SR-Clienteigenschaft mit diesem Präfix, z confluent.schema.registry.basic.auth.user.info . B. für Standardauthentifizierungsanmeldeinformationen. Erforderlich für die Schemaregistrierungsvarianten von from_avro und to_avro.

CSV

CSV-Funktionen akzeptieren dieselben Optionen wie die entsprechenden DataFrame-Optionen:

Example

Im folgenden Beispiel wird CSV mit einem benutzerdefinierten Trennzeichen und NULL -Wert gelesen:

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

JSON

JSON-Funktionen akzeptieren dieselben Optionen wie die entsprechenden DataFrame-Optionen:

Example

Das folgende Beispiel schreibt JSON mit NULL ignorierten Feldern und aktivierter ziemlicher Formatierung:

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobuf und to_protobuf verwenden Sie keine dateibasierte Datenquelle. Protobuf-Daten werden mit diesen Funktionen immer als binäre Spalten gelesen und geschrieben. Optionen werden als Groß Map[String, String] - und Kleinschreibung übergeben.

Example

Im folgenden Beispiel wird eine Protobuf-Spalte im PERMISSIVE-Modus decodiert:

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Protobuf-Funktionen verwenden die folgenden Optionen:

Schlüssel Vorgabe Gültige Werte Description
mode FAILFAST FAILFAST, PERMISSIVE So behandeln Sie beschädigte Datensätze. FAILFAST löst eine Ausnahme aus. PERMISSIVE legt falsch formatierte Felder auf NULL fest. Gilt für from_protobuf.
recursive.fields.max.depth -1 (deaktiviert) 0 bis 10 Maximale Rekursionstiefe für rekursive Protobuf-Felder. Legen Sie fest, 0 dass die rekursive Feldunterstützung deaktiviert werden soll. Gilt für from_protobuf.
convert.any.fields.to.json false true, false Gibt an, ob Protobuf-Felder Any anstelle einer JSON-Zeichenfolge in eine STRUCTJSON-Zeichenfolge konvertiert werden sollen. Gilt für from_protobuf.
emit.default.values false true, false Gibt an, ob Felder mit Null- oder Standardwerten (Proto3-Semantik) ausgegeben werden sollen. Wenn false, Felder mit Standardwerten werden aus der Ausgabe weggelassen. Gilt für from_protobuf.
enums.as.ints false true, false Gibt an, ob Enumerationsfelder anstelle von Zeichenfolgen als ganzzahlige Werte gerendert werden sollen. Gilt für from_protobuf.
upcast.unsigned.ints false true, false Gibt an, uint32 ob Der Ganzzahlüberlauf auf Long und uint64 verhindert werden soll Decimal(20,0) . Gilt für from_protobuf.
unwrap.primitive.wrapper.types false true, false Gibt an, google.protobuf ob Wrappertypen (z. B. und Int32Value) auf die entsprechenden primitiven Spark-Typen entpackt StringValue werden sollen. Gilt für from_protobuf.
retain.empty.message.types false true, false Gibt an, ob leere Protobuf-Nachrichtentypen im Ausgabeschema beibehalten werden sollen, indem Sie eine Dummyspalte einfügen. Gilt für from_protobuf.
schema.registry.subject None Beliebige Zeichenfolge Name des Schemaregistrierungsbetreffs. Erforderlich bei Verwendung der Schemaregistrierungsvarianten von from_protobuf und to_protobuf.
schema.registry.address None Eine host:port Zeichenfolge Schemaregistrierungsadresse (Host und Port). Erforderlich bei Verwendung der Schemaregistrierungsvarianten von from_protobuf und to_protobuf.
schema.registry.protobuf.name None Beliebige Zeichenfolge Gibt an, welche Protobuf-Nachricht verwendet werden soll, wenn der Betreff der Schemaregistrierung mehrere Nachrichten enthält. Dies ist optional.
schema.registry.schema.evolution.mode "restart" "restart", "none" Wie Schemaänderungen behandelt werden, wenn eine neuere Schema-ID in einem eingehenden Datensatz erkannt wird. "restart" beendet die Abfrage mit einem UnknownFieldException; konfiguriert Aufträge, um neu zu starten, wenn Änderungen nicht aufgenommen werden. "none" ignoriert Schema-ID-Änderungen und analysiert neuere Datensätze mit dem ursprünglichen Schema.
confluent.schema.registry.<option> Beliebiger gültiger Clientoptionswert für confluent Schema Registry Übergeben Sie eine beliebige Confluent Schema Registry-Clientoption mithilfe des Präfixes "confluent.schema.registry". Legen Sie z. B. die Standardauthentifizierung "confluent.schema.registry.basic.auth.credentials.source" fest "USER_INFO" und "confluent.schema.registry.basic.auth.user.info" konfigurieren sie "<KEY>:<SECRET>" fest.

XML

XML-Funktionen akzeptieren dieselben Optionen wie die entsprechenden DataFrame-Optionen:

Example

Im folgenden Beispiel wird XML mit benutzerdefinierten Stamm- und Zeilentags geschrieben:

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))