Spark API オプション リファレンス

このページでは、データの読み取りと書き込みを行う Spark API で使用できる入力オプションと出力オプションの一覧を示します。

DataFrameReader オプション

これらのオプションは、DataFrameReader.option()DataFrameReader.options()read_filesCOPY INTO、および Auto Loader と共に使用Azure Databricksデータ ファイルの読み取り方法を制御します。

Example

次の例では、JSON ファイルを読み取るためのmultiLineTrueを設定します。

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)

共通

次のオプションは、すべてのファイル形式に適用されます。

Key デフォルト 有効な値 Description
ignoreCorruptFiles false truefalse 破損したファイルを無視するかどうか。 true の場合、破損したファイルが検出されても Spark ジョブは引き続き実行され、読み取られた内容は引き続き返されます。 COPY INTOでは、Delta Lake 履歴のnumSkippedCorruptFiles列にoperationMetricsとして、スキップされた破損したファイルを観察できます。 Databricks Runtime 11.3 LTS 以降で使用できます。
ignoreMissingFiles false自動ローダーの場合は、trueCOPY INTO (レガシ) truefalse 行方不明のファイルを無視するかどうかを指定します。 true の場合、不足しているファイルが検出されても Spark ジョブは引き続き実行され、内容は引き続き返されます。 Databricks Runtime 11.3 LTS 以降で使用できます。
modifiedAfter None タイムスタンプ文字列 指定したタイムスタンプの後に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。
modifiedBefore None タイムスタンプ文字列 指定したタイムスタンプの前に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。
pathGlobFilter または fileNamePattern None glob パターン文字列 ファイルを選択するための可能性のある glob パターン。 PATTERN (レガシ) でのCOPY INTOに相当します。 fileNamePattern では read_files を使用できます。
recursiveFileLookup false truefalse true場合、このオプションは、名前が date=2019-07-01 のようなパーティションの名前付けスキームに従っていない場合でも、入れ子になったディレクトリを検索します。

アブロ

Avro ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
avroSchema None Avro スキーマ文字列 Avro 形式のユーザーによって指定された省略可能なスキーマ。 Avro を読み取る場合、このオプションは互換性があるが、実際の Avro スキーマとは異なる進化したスキーマに設定できます。 逆シリアル化スキーマは、進化したスキーマと一致します。 たとえば、既定値を持つ追加の列を 1 つ含む進化したスキーマを設定した場合、読み取り結果にも新しい列が含まれます。
avroSchemaEvolutionMode none nonerestart スキーマ レジストリを使用するときにスキーマの進化を処理する方法。 none はスキーマの変更を無視し、ジョブを続行します。 restart では、スキーマの変更が検出され、ジョブの再起動が必要な場合に UnknownFieldException が発生します。
datetimeRebaseMode LEGACY EXCEPTIONLEGACYCORRECTED ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。
enableStableIdentifiersForUnionType false truefalse Avro 共用体型に安定したフィールド名を使用するかどうか。 有効にすると、共用体の型フィールド名は、小文字の型名から派生します (たとえば、 member_intmember_string)。 2 つの型名が小文字の後で同一の場合、例外をスローします。
mergeSchema false truefalse 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 Avro に対して mergeSchema を有効にしても、データ型は緩和されません。
mode FAILFAST FAILFASTPERMISSIVEDROPMALFORMED 破損したレコードを処理するためのパーサー モード。 FAILFAST で例外がスローされました。 PERMISSIVE は、形式が正しくないフィールドを null に設定します。 DROPMALFORMED 不正なレコードが自動的に削除されます。
readerCaseSensitive true truefalse rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
recursiveFieldMaxDepth None 0 から 15 に変更します 再帰 Avro フィールドの最大再帰深度。 すべての再帰フィールドを切り捨てる 1 に設定し、1 レベルの再帰を許可する 2 など、最大 15に設定します。 設定解除または 0場合、再帰フィールドは許可されません。
rescuedDataColumn None 列名の文字列 データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
詳細については、「復旧されたデータ列とは」を参照してください。
stableIdentifierPrefixForUnionType member_ 任意の文字列 enableStableIdentifiersForUnionType=trueするときに、安定した共用体型のフィールド名に使用するプレフィックス。

Csv

CSV ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
badRecordsPath None パス文字列 不正な CSV レコードに関する情報を記録するためのファイルを格納するパス。
charToEscapeQuoteEscaping \0 1 文字 引用符のエスケープに使用する文字をエスケープするために使用する文字。 たとえば、レコードが [ " a\\", b ] の場合は次のようになります。
  • '\'をエスケープする文字が未定義の場合、レコードは解析されません。 パーサーは文字 ([a],[\],["],[,],[ ],[b]) を読み取りますが、終了引用符が見つからないため、エラーをスローします。
  • '\' をエスケープする文字が '\'として定義されている場合、[a\][b]の 2 つの値でレコードが読み取られます。
columnNameOfCorruptRecord _corrupt_record 列名の文字列 自動ローダーがサポートされています。 COPY INTO (レガシ) ではサポートされていません。
形式に誤りがあり、解析できないレコードを格納するための列。 解析の modeDROPMALFORMED に設定する場合、この列は空になります。
comment \0 1 文字 テキスト行の先頭に配置した場合に行コメントを表す文字を定義します。 コメントのスキップを無効にするには、'\0' を使用します。
dateFormat yyyy-MM-dd 日付書式指定文字列 日付文字列を解析するための形式。
emptyValue 空の文字列 任意の文字列 空の値の文字列表現。
enableDateTimeParsingFallback false truefalse 指定した形式で値を解析できない場合に、従来の日付とタイムスタンプの解析動作にフォールバックするかどうかを指定します。 falseすると、解析エラーが発生するか、modeに応じて null が生成されます。
encoding または charset UTF-8 java.nio.charset.Charset CSV ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。 UTF-16UTF-32 の場合、multilinetrue を使用することはできません。
enforceSchema true truefalse 指定または推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 自動ローダーを使用してデータをレスキューし、スキーマの展開を許可する場合、このオプションは既定では無視されます。
escape \ 1 文字 データの解析時に使用するエスケープ文字。
extension csv ファイル拡張子文字列 読み取りに必要なファイル名拡張子。 この拡張子のないファイルは除外されます。
failOnUnknownFields false truefalse CSV レコードにスキーマに存在しない列が含まれている場合に失敗するかどうか。 falseすると、認識されない列は、rescuedDataColumnに応じて自動的に削除または救助されます。
failOnWidenedFields false truefalse フィールド値を、拡大せずに宣言されたスキーマ型として解析できない場合に失敗するかどうか。 falseすると、rescuedDataColumnに応じて、型が拡大された値が自動的に復旧されます。 failOnUnknownFields=true設定すると、このオプションの効果をマスクできます。
header false truefalse CSV ファイルにヘッダーが含まれているかどうか。 自動ローダーによって、スキーマの推論時にファイルにヘッダーが含まれているものと見なされます。
ignoreLeadingWhiteSpace false truefalse 解析対象の各値の先頭の空白文字を無視するかどうか。
ignoreTrailingWhiteSpace false truefalse 解析対象の各値の末尾の空白文字を無視するかどうか。
inferSchema false truefalse 解析対象の CSV レコードのデータ型を推論するか、すべての列が StringType であると見なすか。 true に設定した場合は、追加でデータを渡す必要があります。 自動ローダーの場合は、代わりに cloudFiles.inferColumnTypes を使います。
inputBufferSize 1048576 (1 MB) 正の整数 CSV パーサーのバッファー サイズ (バイト単位)。 大きな CSV ファイルを解析するときにメモリ使用量を調整する場合に便利です。
lineSep なし。 \r\r\n、および \n 文字列 連続する 2 つの CSV レコードの間の文字列。
locale US java.util.Locale識別子 CSV 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与える、識別されたJavaロケール。
maxCharsPerColumn -1 正の整数、または無制限の-1 解析する値の予想最大文字数。 メモリ エラーを回避するために使用できます。 既定値は -1 で、無制限を意味します。
maxColumns 20480 正の整数 レコードに含めることができる列数のハード制限。
mergeSchema false truefalse 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 スキーマの推論時に、自動ローダーに対して既定で有効になります。
mode PERMISSIVE PERMISSIVEDROPMALFORMEDFAILFAST 形式に誤りがあるレコードの処理に関するパーサーのモード。
multiLine false truefalse CSV レコードが複数の行にまたがるかどうか。
nanValue NaN 任意の文字列 FloatType および DoubleType 列を解析する際の非数値の文字列表現。
negativeInf -Inf 任意の文字列 FloatType または DoubleType 列を解析する際の負の無限大の文字列表現。
nullValue 空の文字列 任意の文字列 null 値の文字列表現。
parserCaseSensitive (非推奨) false truefalse ファイルの読み取り中に、ヘッダーに宣言されている列をスキーマの大文字と小文字の区別に合わせるかどうか。 自動ローダーについては、これは既定で true となります。 有効にすると、大文字と小文字が異なる列は rescuedDataColumn に救出されます。 readerCaseSensitive が優先されるため、このオプションは非推奨となりました。
positiveInf Inf 任意の文字列 FloatType または DoubleType 列を解析する際の正の無限大の文字列表現。
preferDate true truefalse 可能な場合、タイムスタンプではなく日付として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。
quote " 1 文字 フィールド区切り記号が値に含まれる場合に、値のエスケープに使用する文字。
readerCaseSensitive true truefalse rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
rescuedDataColumn None 列名の文字列 データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
sep または delimiter , 文字列 列の間の区切り文字列。
singleVariantColumn None 列名の文字列 列名に設定すると、各フィールドを独自の列に解析するのではなく、CSV レコード全体をその名前の単一の VariantType 列に読み取ります。 header=true が必要です。
skipRows 0 正の整数または 0 コメントされた行や空の行を含む、無視する必要がある CSV ファイルの先頭からの行数。 header が true の場合、ヘッダーは最初にスキップされていない行とコメントされていない行になります。
timeFormat HH:mm:ss 時刻書式指定文字列 列の値 TimeType 解析するための形式。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式の文字列 タイムスタンプ文字列を解析するための形式。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。
timeZone None java.time.ZoneId文字列 タイムスタンプと日付を解析するときに使用する java.time.ZoneId
unescapedQuoteHandling STOP_AT_DELIMITER STOP_AT_CLOSING_QUOTEBACK_TO_DELIMITERSTOP_AT_DELIMITERSKIP_VALUERAISE_ERROR エスケープされていない引用符を処理するための方策。 許可される各オプションの動作は次のとおりです。
  • STOP_AT_CLOSING_QUOTE: 入力にエスケープされていない引用符が見つかった場合は、引用符文字を蓄積し、終了引用符が見つかるまで値を引用符で囲まれた値として解析します。
  • BACK_TO_DELIMITER: 入力にエスケープされていない引用符が見つかった場合は、その値を引用符で囲まれていない値と見なします。 これにより、sep によって定義された区切り記号が見つかるまで、パーサーは現在解析対象となっている値のすべての文字を蓄積します。 値に区切り記号が見つからない場合は、区切り記号または行末が見つかるまで、入力の文字がパーサーによって蓄積され続けます。
  • STOP_AT_DELIMITER: 入力にエスケープされていない引用符が見つかった場合は、その値を引用符で囲まれていない値と見なします。 これにより、sep に定義した区切り記号または行末が入力内で見つかるまで、すべての文字がパーサーによって蓄積されます。
  • SKIP_VALUE: 入力にエスケープされていない引用符が見つかった場合、指定された値に対して解析されたコンテンツはスキップされ (次の区切り記号が見つかるまで)、nullValue で設定された値が代わりに生成されます。
  • RAISE_ERROR: 入力にエスケープされていない引用符が見つかった場合は、 TextParsingException がスローされます。

Excel

Excelファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
dataAddress None セル範囲またはシート名の文字列 Excel構文で読み取るセル範囲。 省略した場合は、最初のシートからすべての有効なセルを読み取ります。 SheetName!C5:H10を使用して、名前付きシートから範囲を読み取るか、最初のシートから範囲を読み取C5:H10するか、特定のシートからすべてのデータを読み取SheetNameします。
headerRows 0 01 列名ヘッダーとして使用する初期行の数。 dataAddressを指定すると、セル範囲内に適用されます。 0すると、列名は_c1_c2_c3などとして自動生成されます。
ignoreMissingSheet false truefalse dataAddressで指定されたシートを含まないファイルをサイレント スキップするかどうかを指定します。 falseすると、ファイルに要求されたシートがない場合にエラーがスローされます。 シート名が dataAddressで指定されている場合にのみ適用されます。
includePhoneticRuns false truefalse XLSX ファイルを読み取るときに、セル文字列値に連結されたふりがな (pinyin やふりがななど) 発音注釈を含めるかどうか。
operation readSheet readSheetlistSheets Excel ブックに対して実行する操作。 readSheet はシートからデータを読み取ります。 listSheets は、各シートのフィールド sheetIndex: long および sheetName: String を持つ構造体を返します。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 Excelに文字列として格納されるタイムスタンプなしのタイムゾーン値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。
dateFormat yyyy-MM-dd 日付書式指定文字列 Dateとして読み取られた文字列値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。

Json

JSON ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
allowBackslashEscapingAnyCharacter false truefalse バックスラッシュを使用して、後続の任意の 1 文字をエスケープすることを許可するかどうか。 有効にしない場合は、JSON の仕様に明示されている文字のみをエスケープできます。
allowComments false truefalse 解析対象のコンテンツ内で Java、C、および C++ スタイルのコメント ('/''*'、および '//' の種類) の使用を許可するかどうか。
allowNonNumericNumbers true truefalse 非数値 (NaN) トークンのセットを有効な浮動小数点数値として許可するかどうか。
allowNumericLeadingZeros false truefalse 追加の (無視できる) ゼロで始まる整数値を許可するかどうか (例: 000001)。
allowSingleQuotes true truefalse 単一引用符 (アポストロフィ、'\' 文字) を使用して、文字列 (名前と文字列値) を囲むことを許可するかどうか。
allowUnquotedControlChars false truefalse JSON 文字列に、エスケープされていない制御文字 (タブや改行文字など、値が 32 未満の ASCII 文字) を含めることを許可するかどうか。
allowUnquotedFieldNames false truefalse 引用符で囲まれていないフィールド名の使用を許可するかどうか。これは JavaScript では許可されますが、JSON 仕様では許可されません。
alternateVariantEncoding None Z85 ソース JSON の Variant 値に使用されるエンコード。 インライン JSON として格納されるのではなく、Base85 でエンコードされた Variant 値をデコードするには、 Z85 に設定します。
badRecordsPath None パス文字列 不正な JSON レコードに関する情報を記録するためのファイルを格納するパス。
ファイル ベースのデータ ソースで badRecordsPath オプションを使用する場合、次の制限があります。
  • これは非トランザクションであり、一貫性のない結果につながる可能性があります。
  • 一時的なエラーはエラーとして扱われます。
columnNameOfCorruptRecord _corrupt_record 列名の文字列 形式に誤りがあり、解析できないレコードを格納するための列。 解析の modeDROPMALFORMED に設定する場合、この列は空になります。
dateFormat yyyy-MM-dd 日付書式指定文字列 日付文字列を解析するための形式。
dropFieldIfAllNull false truefalse スキーマの推論中に、すべて null 値の列または空の配列および構造体を無視するかどうか。
encoding または charset UTF-8 java.nio.charset.Charset JSON ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。 UTF-16UTF-32 の場合、multilinetrue を使用することはできません。
inferTimestamp false truefalse タイムスタンプ文字列を TimestampType として推論を試みるかどうか。 trueに設定すると、スキーマの推論に著しく長い時間がかかる場合があります。 自動ローダーで使うには cloudFiles.inferColumnTypes を有効にする必要があります。
lineSep なし。 \r\r\n、および \n 文字列 連続する 2 つの JSON レコードの間の文字列。
locale US java.util.Locale識別子 JSON 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与えるJavaロケール識別子。
maxNestingDepth 500 正の整数 JSON オブジェクトと配列で許容される入れ子の深さの最大値。 深く入れ子になったドキュメントの場合は、この値を大きくします。
maxNumLen 1000 正の整数 JSON 入力内のトークン数の最大長。 大きな数値リテラルを含む JSON の場合は、この値を増やします。
maxStringLen 無制限 正の整数 JSON 入力内の文字列値の最大長。 大きな文字列で JSON を解析するときのメモリ使用量を制限するように設定します。
mode PERMISSIVE PERMISSIVEDROPMALFORMEDFAILFAST 形式に誤りがあるレコードの処理に関するパーサーのモード。
multiLine false truefalse JSON レコードが複数の行にまたがるかどうか。
prefersDecimal false truefalse 可能な場合は float 型や double 型の代わりに DecimalType として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。
primitivesAsString false truefalse 数値やブール値などのプリミティブ型を StringType として推論するかどうか。
readerCaseSensitive true truefalse rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 Databricks Runtime 13.3 以降で使用できます。
rescuedDataColumn None 列名の文字列 データ型の不一致またはスキーマの不一致 (列の大文字と小文字の区別を含む) によって解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
singleVariantColumn None 列名の文字列 JSON ドキュメント全体を取り込むかどうか。列の名前として指定された文字列を持つ単一の Variant 列に解析されます。 設定されていない場合、JSON フィールドは独自の列に取り込まれます。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式の文字列 タイムスタンプ文字列を解析するための形式。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。
timeZone None java.time.ZoneId文字列 タイムスタンプと日付を解析するときに使用する java.time.ZoneId
upgradeExceptionAsBadRecord false truefalse 型アップグレード例外 (たとえば、宣言された列型に値を拡大できない場合) を、例外をスローするのではなく、無効なレコードとして扱うかどうか。

カフカ

Kafka リーダー オプションの完全な一覧については、「 DataStreamReader Kafka オプション」を参照してください。 次のオプションは、 spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されます。

Key デフォルト 有効な値 Description
endingOffsets latest latest、または JSON オフセット文字列 読み取りを停止する場所。 JSON 文字列では、 -1 は最新のオフセットです。 -2(最も早いオフセット) は、終了オフセットとして使用できません。 JSON オフセット文字列の例を次に示します: {"topicA":{"0":50,"1":-1}}
endingOffsetsByTimestamp None JSON タイムスタンプ文字列 タイムスタンプとして指定されたパーティションごとの終了オフセット (ミリ秒単位)。 たとえば、 {"topicA":{"0":2000,"1":3000}}と指定します。
endingTimestamp None 正の整数または 0 すべてのパーティションに適用されるグローバル終了タイムスタンプ (ミリ秒単位)。

オーク

ORC ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
mergeSchema false truefalse 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。

寄木細工

Parquet ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
datetimeRebaseMode LEGACY EXCEPTIONLEGACYCORRECTED ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。
int96RebaseMode LEGACY EXCEPTIONLEGACYCORRECTED ユリウス暦と予期的グレゴリオ暦の間の INT96 タイムスタンプ値のリベースを制御します。
mergeSchema false truefalse 複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。
readerCaseSensitive true truefalse rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
rescuedDataColumn None 列名の文字列 データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。

状態ストア

構造化ストリーミング状態データを読み取るために、 spark.read.format("statestore") またはテーブル値関数 read_statestore これらのオプションを使用します。 「構造化ストリーミング状態情報の読み取り」をご覧ください。

Key デフォルト 有効な値 Description
batchId 最新のバッチ ID 正の整数または 0 読み取り対象のバッチ。 クエリの以前の状態を照会するために使用します。 バッチはコミットされるべきですが、まだクリーンアップされていません。
operatorId 0 正の整数または 0 読み取り対象の演算子。 クエリに複数のステートフル演算子がある場合に使用します。
storeName DEFAULT 任意の文字列 読み取り対象の状態ストア名。 ステートフル 演算子に複数の状態ストア インスタンスがある場合に使用します。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。
joinSide None leftright ストリーム ストリーム結合の読み取り先となるターゲット側。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。
snapshotStartBatchId None 正の整数または 0 状態を読み取るときに開始点として使用するスナップショットのバッチ ID。 リーダーは、このスナップショットからの変更を batchIdまで再生することで、状態を再構築します。 スナップショットが破損している場合に便利です。 snapshotPartitionIdと共に指定する必要があります。 readChangeFeedでは使用できません。 変更ログのチェックポイント処理が有効になっている HDFS ベースの状態ストアと RocksDB 状態ストアをサポートします。 Databricks Runtime 15.4 LTS 以降で使用できます。
snapshotPartitionId None 正の整数または 0 指定した場合、クエリはこのパーティションのみを読み取ります。 snapshotStartBatchIdと共に指定する必要があります。 readChangeFeedでは使用できません。 Databricks Runtime 15.4 LTS 以降で使用できます。
readChangeFeed false truefalse trueすると、changeStartBatchIdchangeEndBatchIdの間のバッチの指定された範囲にわたって状態の変化を返します。 changeStartBatchId が必要です。 joinSidebatchIdsnapshotStartBatchId、またはsnapshotPartitionIdでは使用できません。 Databricks Runtime 16.4 LTS 以降で使用できます。
詳細については、「 構造化ストリーミング状態の変更の読み取り」を参照してください。
changeStartBatchId None 正の整数または 0 変更フィード範囲の開始バッチ ID。 readChangeFeedtrue の場合に必要です。 readChangeFeedtrue に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。
changeEndBatchId 最新のバッチ ID 正の整数または 0 変更フィード範囲の終了バッチ ID。 changeStartBatchId以上である必要があります。 readChangeFeedtrue に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。
stateVarName None 任意の文字列 読み取る状態変数名。 状態変数名は、init 演算子によって使用されるStatefulProcessortransformWithState関数内の各変数の一意の名前です。 transformWithState演算子を使用する場合は必須です。 Databricks Runtime 16.4 LTS 以降で使用できます。
readRegisteredTimers false truefalse trueすると、transformWithState演算子によって使用される登録済みタイマーを読み取ります。 transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。
flattenCollectionTypes true truefalse trueすると、マップとリストの状態変数に対して返されるレコードがフラット化されます。 falseすると、Spark SQL ArrayまたはMapとしてレコードが返されます。 transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。

テキスト

テキスト ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
encoding UTF-8 java.nio.charset.Charset テキスト ファイルの行区切り記号のエンコードの名前。 ファイルの内容はこのオプションの影響を受けず、as-is読み取られます。
lineSep なし。 \r\r\n 、および \n 文字列 連続する 2 つのテキスト レコード間の文字列。
wholeText false truefalse ファイルを単一レコードとして読み取るかどうか。

Xml

XML ファイルを読み取る場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
rowTag None 任意の文字列 行として扱う XML ファイルの行タグ。 XML <book> <page><page>...<book> の例では、適切な値は page です。 これは必須オプションです。
samplingRatio 1.0 0.0 から 1.0 に変更します スキーマ推論に使用される行の割合を定義します。 XML 組み込み関数はこのオプションを無視します。
excludeAttribute false truefalse 要素内の属性を除外するかどうか。
mode None PERMISSIVEDROPMALFORMEDFAILFAST 解析中に破損したレコードを処理するモードを許可します。
  • PERMISSIVE: 破損したレコードの場合は、columnNameOfCorruptRecord によって構成されたフィールドに形式に誤りがある文字列を格納し、形式に誤りがあるフィールドを null に設定します。 破損したレコードを保持するには、ユーザー定義スキーマで string という名前の columnNameOfCorruptRecord 型フィールドを設定できます。 スキーマにこのフィールドがない場合、破損したレコードは解析中に削除されます。 スキーマを推論すると、パーサーは出力スキーマに columnNameOfCorruptRecord フィールドを暗黙的に追加します。
  • DROPMALFORMED: 破損したレコードを無視します。 このモードは XML 組み込み関数ではサポートされていません。
  • FAILFAST: パーサーが破損したレコードを検出すると、例外をスローします。
inferSchema true truefalse true の場合は、結果として得られる各データフレーム列に対して適切な型を推論しようとします。 false の場合、結果の列はすべて string 型です。 XML 組み込み関数はこのオプションを無視します。
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord 列名の文字列 PERMISSIVE モードで作成された形式の正しくない文字列を含む新しいフィールドの名前を変更できます。
attributePrefix None 任意の文字列 属性と要素を区別するための属性のプレフィックス。 これはフィールド名のプレフィックスになります。 既定値は _ です。 XML の読み取り時は空にすることができますが、書き込み時は空にすることはできません。 DataFrameWriter XML オプションにも適用されます。
valueTag _VALUE 任意の文字列 属性または子要素の要素も持つ要素内の文字データに使用されるタグ。 ユーザーがスキーマで valueTag フィールドを指定することもできますが、文字データが他の要素や属性と一緒に要素に存在する場合、スキーマ推論中に自動的に追加されます。 DataFrameWriter XML オプションにも適用されます。
encoding UTF-8 java.nio.charset.Charset 読み取りの場合は、指定されたエンコードの種類で XML ファイルをデコードします。 書き込みの場合は、保存される XML ファイルのエンコード (文字セット) を指定します。 XML 組み込み関数はこのオプションを無視します。 DataFrameWriter XML オプションにも適用されます。
ignoreSurroundingSpaces true truefalse 値を囲む空白をスキップする必要があるかどうか。 空白のみの文字データは無視されます。
rowValidationXSDPath None ファイル パス文字列 各行の省略可能な XML を個別に検証するために使用される XSD ファイルへのパス。 検証に失敗した行は、解析エラーのように扱われます。 XSD は、指定されているか推論されたかに関係なく、スキーマには影響しません。
ignoreNamespace false truefalse true場合、XML 要素と属性に対する名前空間のプレフィックスは無視されます。 たとえば、タグ <abc:author><def:author> は、どちらも単なる <author> として扱われます。 rowTag 要素では名前空間を無視することはできないため、その子要素の読み込みのみを取り扱うことが可能です。 false の場合でも、XML 解析は名前空間を認識しません。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式の文字列 カスタムタイムスタンプの形式文字列であり、datetime パターンに従います。 これは timestamp 型に適用されます。 DataFrameWriter XML オプションにも適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 タイムゾーンを含まないタイムスタンプのカスタム形式の文字列で、datetime パターン形式に従っています。 これは TimestampNTZType 型に適用されます。 DataFrameWriter XML オプションにも適用されます。
dateFormat yyyy-MM-dd 日付書式指定文字列 カスタム日付形式の文字列であり、datetime パターン形式に従います。 これは、日付型に適用されます。 DataFrameWriter XML オプションにも適用されます。
locale en-US IETF BCP 47 言語タグ IETF BCP 47 形式の言語タグとしてロケールを設定します。 たとえば、locale は日付とタイムスタンプの解析中に使用されます。
nullValue null 任意の文字列 null 値の文字列表記を設定します。 これが null である場合、パーサーはフィールドの属性と要素を書き込みません。 DataFrameWriter XML オプションにも適用されます。
readerCaseSensitive true truefalse rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。
rescuedDataColumn None 列名の文字列 データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) のために解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。 COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。
singleVariantColumn none 列名の文字列 1 つのバリアント列の名前を指定します。 このオプションが読み取り用に指定されている場合は、指定されたオプション文字列値を列の名前として使用して、XML レコード全体を 1 つの Variant 列に解析します。 このオプションを書き込みに指定した場合は、1 つの Variant 列の値を XML ファイルに書き込みます。 DataFrameWriter XML オプションにも適用されます。
useLegacyXMLParser true truefalse レガシ XML パーサーを使用するかどうか。 レガシ パーサーでは、形式が正しくないコンテンツに対する検証の厳格さが低くなりますが、メモリ効率は低くなります。 より厳密な既定のパーサーをオプトインするには、 false に設定します。
wildcardColName xs_any 列名の文字列 ワイルドカード (xs:any) スキーマ要素に一致する XML 要素をキャプチャするために使用される列名。 rescuedDataColumnと一緒に使用することはできません。

DataStreamReader オプション

これらのオプションを DataStreamReader.option() と共に使用して、Delta Lake テーブルやその他のファイル ベースのソースからのストリーミング読み取りを構成します。

ファイル形式のオプション (JSON、CSV、Parquet など) については、「 DataFrameReader オプション」を参照してください。

自動ローダー (cloudFiles.*) オプションについては、「 自動ローダー」を参照してください。

Example

次の例では、Delta Lake テーブル ストリームのmaxFilesPerTrigger10を設定します。

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")

共通

Delta Lake テーブルとその他のファイル ベースのストリーミング ソースには、次のオプションが適用されます。

Key デフォルト 有効な値 Description
cleanSource off offdeletearchive ソース ファイルがストリームによって処理された後で処理する方法。 off は何のアクションも実行しません。 delete ソース ファイルを完全に削除します。 archive は、ファイルを sourceArchiveDirに移動します。 archiveに設定する場合は、sourceArchiveDirも設定する必要があります。 Delta Lake テーブル ストリーミングには適用されません。
fileNameOnly false truefalse 既に処理されているファイルを、完全なパスではなくファイル名のみで識別するかどうか。 trueすると、同じファイル名を持つ異なるパスにあるファイルは同じファイルとして扱われ、再処理されません。 Delta Lake テーブル ストリーミングには適用されません。
latestFirst false truefalse 各マイクロバッチ内で、最後に変更されたファイルを最初に処理するかどうか。 可能な限り迅速に最新のデータを処理する場合に便利です。 truemaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合、maxFileAgeは無視されます。 Delta Lake テーブル ストリーミングには適用されません。
maxBytesPerTrigger None 正の整数 マイクロバッチごとに処理されるデータ量のソフト最大値。 最小の入力ユニットが上限を超えると、バッチが制限を超えて処理される場合があります。 maxFilesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。
自動ローダーの場合は、代わりに cloudFiles.maxBytesPerTrigger を使います。 「共通」を参照してください。
maxCachedFiles 10000 正の整数または 0 後続のマイクロバッチ用にキャッシュする未処理のファイルの最大数。 キャッシュをオフにするには、 0 に設定します。 ソース ディレクトリにトリガーごとに多数の新しいファイルが含まれている場合は、この値を大きくします。 Delta Lake テーブル ストリーミングには適用されません。
maxFileAge 7d 7d4h 現在のシステム時刻ではなく、最近変更されたファイルのタイムスタンプを基準とした、処理対象と見なされるファイルの最大有効期間。 このしきい値より古いファイルは無視されます。 latestFirsttrueされ、maxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合は無視されます。 Delta Lake テーブル ストリーミングには適用されません。
maxFilesPerTrigger 1000 Delta Lake と自動ローダー用。 その他のファイル ベースのソースに対する最大値はありません。 正の整数 各マイクロバッチで処理される新しいファイルの数の上限。 maxBytesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。
自動ローダーの場合は、代わりに cloudFiles.maxFilesPerTrigger を使います。 「共通」を参照してください。
sourceArchiveDir None パス文字列 cleanSourcearchive に設定されている場合のアーカイブ ディレクトリへのパス。 ソース ファイルは、処理後にこのパスに移動され、相対ディレクトリ構造が維持されます。 Delta Lake テーブル ストリーミングには適用されません。

自動ローダー

これらのオプションを cloudFiles ソースと共に使用して、クラウド ストレージからのインジェストをストリーミングするように 自動ローダー を構成します。 cloudFiles ソースに固有のオプションには、他のcloudFiles ソース オプションとは別の名前空間に保持するために、が付いています。

共通

次のオプションは、すべての自動ローダー構成に適用されます。

Key デフォルト 有効な値 Description
cloudFiles.allowOverwrites false truefalse 入力ディレクトリ ファイルの変更による既存のデータの上書きを許可するかどうか。
構成に関する注意事項については、「 ファイルが追加または上書きされたときに、自動ローダーによってファイルが再び処理されますか?」を参照してください。
cloudFiles.backfillInterval None 1 day1 week 自動ローダーは、特定の間隔で非同期バックフィルをトリガーできます。 詳細については、「 cloudFiles.backfillInterval を使用して通常のバックフィルをトリガーする」を参照してください。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
cloudFiles.cleanSource OFF OFFDELETEMOVE 処理されたファイルを自動的に削除するか、入力ディレクトリから移動するか。 OFF (既定値) に設定すると、ファイルは削除されません。
DELETEに設定すると、自動ローダーは処理されてから 30 日後に自動的にファイルを削除します。 これを行うには、自動ローダーにソース ディレクトリへの書き込みアクセス許可が必要です。
MOVEに設定すると、自動ローダーは、ファイルが処理されてから 30 日後cloudFiles.cleanSource.moveDestination指定した場所に自動的に移動します。 これを行うには、自動ローダーには、ソース ディレクトリと移動場所に対する書き込みアクセス許可が必要です。
ファイルは、commit_timeテーブル値関数の結果でcloud_files_stateに null 以外の値がある場合に処理されたと見なされます。 テーブル値関数cloud_files_state参照してください。 処理後の 30 日間の追加待機は、 cloudFiles.cleanSource.retentionDurationを使用して構成できます。
cloudFiles.cleanSourceを有効にする前に、次の考慮事項を確認してください。
  • 最も高速なコンシューマーがファイルを削除し、低速のソースに取り込まれないため、ソースの場所からデータを消費するストリームが複数ある場合、Azure Databricksはこのオプションを使用しないことをお勧めします。
  • この機能を有効にするには、自動ローダーがそのチェックポイントで追加の状態を維持する必要があります。これにより、パフォーマンスのオーバーヘッドが発生しますが、テーブル値関数 cloud_files_state による可観測性が向上します。 テーブル値関数cloud_files_state参照してください。
  • cleanSource は、現在の設定を使用して、特定のファイルを MOVE するか DELETE するかを決定します。 たとえば、ファイルが最初に処理されたときに設定が MOVE されたが、30 日後にファイルがクリーンアップの候補になったときに DELETE に変更されたとします。 この場合、cleanSource によってファイルが削除されます。
  • ファイルは、 retentionDuration の有効期限が切れるとすぐに消去される保証はありません。 コストを抑えるために、自動ローダーはストリーム処理と同時にファイルを削除し、ストリーム処理が完了するか終了するとすぐに終了します。 クリーンアップの候補であったが、ストリーム処理中にクリーンアップできなかったファイルは、次回の自動ローダーの実行時に取得されます。

Databricks Runtime 16.4 以降で使用できます。
cloudFiles.cleanSource.retentionDuration 30 days calendarInterval 文字列 (14 days2 weeks、または1 month 処理されたファイルが cleanSourceを使用したアーカイブの候補になるまでの待機時間。 DELETEの場合は 7 日を超える必要があります。 MOVEの最小制限はありません。
Databricks Runtime 16.4 以降で使用できます。
cloudFiles.cleanSource.moveDestination None クラウド ストレージまたは Unity カタログのボリューム パス cloudFiles.cleanSourceMOVEに設定されている場合に処理されたファイルをアーカイブするパス。 クラウド ストレージ パスまたは Unity カタログ ボリューム パス ( /Volumes/my_catalog/my_schema/my_volume/archive/ など) を指定できます。
移動場所は次の条件を満たす必要があります。
  • ソース ディレクトリの子ではありません。 移動先をソース ディレクトリ内に配置すると、アーカイブされたファイルが再び取り込まれます。
  • ソースと同じ外部の場所、ボリューム、または DBFS マウント内にある。 クロスバケットとクロスコンテナーの移動はサポートされていないため、エラーが発生します。

自動ローダーには、このディレクトリへの書き込みアクセス許可が必要です。
Databricks Runtime 16.4 以降で使用できます。
cloudFiles.format なし (必須オプション) avrobinaryFilecsvjsonorcparquettextxml ソース パスのデータ ファイル形式。 有効な値は次のとおりです。
cloudFiles.includeExistingFiles true truefalse ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着した新しいファイルのみを処理するか。 このオプションは、初めてストリームを開始するときにのみ評価されます。 ストリームの再起動後にこのオプションを変更した場合、効果はありません。
cloudFiles.inferColumnTypes false truefalse スキーマの推論を利用するときに、正確な列型を推論するかどうか。 既定では、列は JSON および CSV データセットを推論するときに文字列として推論されます。 詳細については、スキーマの推論に関する説明を参照してください。
cloudFiles.maxBytesPerTrigger None 次のようなバイト文字列 10g 各トリガーで処理される新しいバイトの最大数。 これはソフト最大値です。 それぞれ 3 GB のファイルがある場合、Azure Databricksはマイクロバッチで 12 GB を処理します。 個々のファイルがマイクロバッチに分割されることはありません。サイズがこの制限を超えた場合でも、常に 1 つの範囲内で完全に処理されます。 cloudFiles.maxFilesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。 このオプションは、Trigger.Once() (Trigger.Once() は非推奨) と共に使用しても無効です。
Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。
cloudFiles.maxFileAge None 期間文字列 重複排除を目的としてファイル イベントを追跡する期間。 Databricks では、1 時間に数百万のファイルの順序でデータを取り込む場合でない限り、このパラメーターのチューニングは推奨しません。 詳細については、 ファイル イベントの追跡 に関するセクションを参照してください。
cloudFiles.maxFileAge のチューニングがアグレッシブすぎると、重複取り込みやファイル欠如など、データ品質の問題を引き起こすことがあります。 そのため、Databricks は cloudFiles.maxFileAge に 90 日間などの控えめな設定を推奨しています。同等のデータ インジェスト ソリューションもこのくらいを推奨しています。
cloudFiles.maxFilesPerTrigger 1000 正の整数 各トリガーで処理される新しいファイルの最大数。 cloudFiles.maxBytesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。 Trigger.Once() (非推奨) と一緒に使用すると、このオプションは無効です。
Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。
cloudFiles.partitionColumns None 列名のコンマ区切りリスト ファイルのディレクトリ構造から推論する Hive スタイルのパーティション列のコンマ区切りの一覧。 Hive スタイルのパーティション列は、 <base-path>/a=x/b=1/c=y/file.formatなどの等値記号で結合されたキーと値のペアです。 この例では、パーティション列は abc です。 既定では、スキーマ推論を使用し、データを読み込む <base-path> を指定している場合、これらの列は自動的にスキーマに追加されます。 スキーマを指定した場合、自動ローダーでは、これらの列がスキーマに含まれることが想定されます。 これらの列をスキーマの一部に含めない場合は、"" を指定して、これらの列を無視することができます。 さらに、下の例のような複雑なディレクトリ構造のファイル パスから列を推論するときに、このオプションを使用できます。
<base-path>/year=2022/week=1/file1.csv
<base-path>/year=2022/month=2/day=3/file2.csv
<base-path>/year=2022/month=2/day=4/file3.csv
cloudFiles.partitionColumnsとしてyear,month,dayを指定すると、year=2022file1.csvが返されますが、month列とday列はnull
monthday は、 file2.csvfile3.csvに対して正しく解析されます。
cloudFiles.schemaEvolutionMode addNewColumns スキーマが指定されていない場合は none それ以外の場合 addNewColumnsnonerescuefailOnNewColumns 新しい列がデータで検出された場合にスキーマを展開するモード。 既定では、列は JSON データセットを推論するときに文字列として推論されます。 詳細については、スキーマの展開に関する説明を参照してください。
cloudFiles.schemaHints None スキーマ文字列 スキーマ推論中に自動ローダーに指定するスキーマ情報。 詳細については、スキーマ ヒントに関するページを参照してください。
cloudFiles.schemaLocation なし (スキーマを推論するために必要) パス文字列 推論されたスキーマとそれ以降の変更を保存する場所。 詳細については、スキーマの推論に関する説明を参照してください。
cloudFiles.useStrictGlobber false truefalse Apache Spark の他のファイル ソースの既定のグロビング動作に一致する厳密な globber を使用するかどうか。 詳細については、「一般的なデータ読み込みパターン」を参照してください。 Databricks Runtime 12.2 LTS 以降で使用できます。
cloudFiles.validateOptions true truefalse 自動ローダー オプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。

ディレクトリの一覧

ディレクトリ一覧モードを使用する場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
cloudFiles.useIncrementalListing (非推奨) autoDatabricks Runtime 17.2 以降では、Databricks Runtime 17.3 以降でfalse autotruefalse この機能は廃止されました。 Databricks では、の代わりにcloudFiles.useIncrementalListingを使用することをお勧めします。
ディレクトリ一覧モードで、完全な一覧ではなく、増分一覧を使用するかどうか。 既定では、自動ローダーは、特定のディレクトリが増分一覧に該当する場合に、ベスト エフォートで自動検出を行います。 これを true または false に設定することで、増分一覧または完全なディレクトリ一覧を明示的に使用できます。
構文指定されていないディレクトリでインクリメンタル リストを誤って有効にすると、自動ローダーが新しいファイルを検出できなくなります。
Azure Data Lake Storage (abfss://)、S3 (s3://)、GCS (gs://) で動作します。
Databricks Runtime 9.1 LTS 以降で使用できます。

ファイル通知

必要なクラウドのアクセス許可、セットアップ手順、認証方法など、ファイル通知モードの構成については、「 ファイル通知モードでの自動ローダー ストリームの構成」を参照してください。

Key デフォルト 有効な値 Description
cloudFiles.fetchParallelism 1 正の整数 キュー サービスからメッセージを取得するときに使用するスレッドの数。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
cloudFiles.pathRewrites None JSON マップ文字列 複数の S3 バケットからファイル通知を受信する queueUrl を指定し、これらのコンテナー内のデータにアクセスするように構成されたマウント ポイントを使用する場合にのみ必要です。 bucket/key パスのプレフィックスをマウント ポイントで書き換える場合は、このオプションを使用します。 プレフィックスのみ書き換え可能です。 たとえば、構成 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}の場合、パス s3://<databricks-mounted-bucket>/path/2017/08/fileA.jsondbfs:/mnt/data-warehouse/2017/08/fileA.jsonに書き換えられます。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
cloudFiles.resourceTag None キーと値のタグ文字列 関連するリソースの関連付けと識別に役立つ一連のキーと値のタグ ペア。次に例を示します。
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。 代わりに、クラウド プロバイダー コンソールを使用してリソース タグを設定します。
詳細については、「 クラウド プロバイダーのリソース タグ」を参照してください。
cloudFiles.useManagedFileEvents false truefalse trueに設定すると、自動ローダーはファイル イベント サービスを使用して、外部の場所にあるファイルを検出します。 このオプションは、読み込みパスがファイル イベントが有効になっている外部の場所にある場合にのみ使用できます。 ファイル イベントでファイル通知モードを使用するを参照してください。
自動ローダーは前回の実行後に新しいファイルを検出できるため、ファイル イベントはファイル検出で通知レベルのパフォーマンスを提供します。 ディレクトリ一覧とは異なり、このプロセスではディレクトリ内のすべてのファイルを一覧表示する必要はありません。
ファイル イベント オプションが有効になっている場合でも、自動ローダーでディレクトリ 一覧が使用される場合があります。
  • 初期読み込み中に、 includeExistingFilestrue に設定されている場合、完全なディレクトリ一覧が実行され、自動ローダーが開始される前にディレクトリに存在していたすべてのファイルが検出されます。
  • ファイル イベント サービスは、最近作成されたファイルをキャッシュすることで、ファイルの検出を最適化します。 自動ローダーの実行頻度が低い場合、このキャッシュの有効期限が切れる可能性があり、自動ローダーはディレクトリ一覧にフォールバックしてファイルを検出し、キャッシュを更新します。 このシナリオを回避するには、少なくとも 7 日に 1 回は自動ローダーを呼び出します。

自動ローダーでこのオプション を指定してディレクトリ一覧を使用 する場合の状況の包括的な一覧については、「ファイル イベントを含む自動ローダーでディレクトリ一覧を使用するタイミング」を参照してください。
Databricks Runtime 14.3 LTS 以降で使用できます。
cloudFiles.listOnStart false truefalse trueに設定すると、自動ローダーは、チェックポイントの継続トークンで始まるのではなく、ストリームの開始時に完全なディレクトリ一覧を実行します。 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENなどのエラーから回復するには、このオプションを使用します。 CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN エラーから回復する方法」を参照してください。
cloudFiles.useNotifications false truefalse ファイル通知モードを使用して、新しいファイルがあるときを判断するかどうか。 false の場合は、ディレクトリ一覧モードを使用します。 「自動ローダー ファイル検出モードを比較する」を参照してください。
cloudFiles.useManagedFileEventstrue に設定されている場合は使用しないでください。
クラウド プロバイダーのリソース タグ

自動ローダーでは、ベスト エフォートベースで次のキーと値のタグのペアが既定で追加されます。

  • vendor: Databricks
  • path: データが読み込まれる場所。 ラベル付けの制限のため、GCP では使用できません。
  • checkpointLocation: ストリームのチェックポイントの場所。 ラベル付けの制限のため、GCP では使用できません。
  • streamId: ストリームのグローバルに一意な識別子。

Databricks はこれらのキー名を予約し、それらの値を上書きすることはできません。

Azure の詳細については、キューとメタデータの名前付けと、properties.labelsイベント サブスクリプションに関する情報をご覧ください。 自動ローダーは、これらのキーと値のタグ ペアを JSON にラベルとして保存します。

クラウド固有

自動ローダーには、ファイル通知モード用にクラウド インフラストラクチャを構成するためのオプションがあります。 必要なクラウドのアクセス許可とセットアップ手順については、「 ファイル通知モードで自動ローダー ストリームを構成する」を参照してください。

Azure

cloudFiles.useNotifications = trueを指定し、自動ローダーで通知サービスを設定する場合は、次のすべてのオプションの値を指定する必要があります。

Key デフォルト 有効な値 Description
cloudFiles.resourceGroup None 任意の文字列 ストレージ アカウントが作成されるAzure リソース グループ。
cloudFiles.subscriptionId None 任意の文字列 リソース グループが作成されるAzure サブスクリプション ID。
databricks.serviceCredential None 任意の文字列 Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。

Databricks サービスの資格情報を使用できない場合は、代わりに次の認証オプションを指定できます。

Key デフォルト 有効な値 Description
cloudFiles.clientId None 任意の文字列 サービス プリンシパルのクライアント ID またはアプリケーション ID。
cloudFiles.clientSecret None 任意の文字列 サービス プリンシパルのクライアント シークレット。
cloudFiles.connectionString None 接続の文字列 アカウント アクセス キーまたは Shared Access Signature (SAS) に基づくストレージ アカウントの接続文字列。
cloudFiles.tenantId None 任意の文字列 サービス プリンシパルが作成されるAzure テナント ID。

cloudFiles.useNotifications = trueを設定し、自動ローダーで既存のキューを使用する場合にのみ、次のオプションを指定します。

Key デフォルト 有効な値 Description
cloudFiles.queueName None 任意の文字列 Azure キューの名前。 指定した場合、クラウド ファイル ソースは、独自のAzure Event Gridおよび Queue Storage サービスを設定する代わりに、このキューからのイベントを直接使用します。 その場合、databricks.serviceCredential または cloudFiles.connectionString では、キューに対する読み取りアクセス許可のみが必要です。

Delta Lake

次のオプションは、 spark.readStreamを使用して Delta Lake テーブルから読み取るときに適用されます。

Key デフォルト 有効な値 Description
allowSourceColumnDrop None バージョン番号または always 差分テーブルのバージョン番号または always に設定すると、ソース テーブル スキーマから列が削除された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。 schemaTrackingLocation が必要です。 「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。
allowSourceColumnRename None バージョン番号または always 差分テーブルのバージョン番号または always に設定すると、ソース テーブル内の列の名前が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。 schemaTrackingLocation が必要です。 「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。
allowSourceColumnTypeChange None バージョン番号または always 差分テーブルのバージョン番号または always に設定すると、ソース テーブルで列の種類が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。 schemaTrackingLocation が必要です。 「タイプ拡張」を参照してください。
excludeRegex None Java正規表現文字列 正規表現パターン。 パスがパターンと一致するファイルは、ストリーミング読み取りから除外されます。 予期される名前付け規則に準拠していないファイルをフィルターで除外する場合に便利です。
failOnDataLoss true truefalse ログの保持 (logRetentionDuration) のためにソース データが削除された場合にストリーミング クエリを失敗させるかどうか。 不足しているデータをスキップして処理を続行するには、 false に設定します。 「タイム トラベル クエリのデータ保持を構成する」を参照してください。
ignoreChanges (非推奨) false truefalse Databricks Runtime 11.3 LTS 以前で使用できます。 UPDATEMERGE INTODELETEOVERWRITEなどの変更操作後に、書き換えられたデータ ファイルを再出力します。 変更されていない行は新しい行と共に出力される可能性があるため、ダウンストリーム コンシューマーは重複を処理する必要があります。 削除はダウンストリームには反映されません。 Databricks Runtime 12.2 LTS 以降の skipChangeCommits に置き換えられました。
ignoreDeletes (非推奨) false truefalse パーティション境界でデータを削除するトランザクションを無視します (パーティションの完全な削除のみ)。 パーティション以外の削除、更新、またはその他の変更は処理しません。 skipChangeCommits を代わりに使用します。
readChangeFeed または readChangeData false truefalse ストリーミング クエリの変更データ フィードの読み取りを有効にするかどうかを指定します。 有効にすると、ストリームは、追加のメタデータ列を含む行レベルの変更 (挿入、更新、および削除) を出力します。 Azure Databricksでの変更データ フィードの使用を参照してください。
schemaTrackingLocation None パス文字列 Delta Lake がストリーミング読み取りのスキーマ変更を追跡するディレクトリへのパス。 列マッピングが有効になっているテーブルからストリーミングし、スキーマの進化を処理するために allowSourceColumn* オプションを使用する場合に必要です。 ストリーミング クエリの checkpointLocation 内にある必要があります。 「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。
skipChangeCommits false truefalse 既存のレコードを削除または変更するトランザクションを無視し、追加のみを処理します。 Databricks では、変更データ フィードを使用しないほとんどのワークロードに対して、このオプションをお勧めします。 Databricks Runtime 12.2 LTS 以降で使用できます。 skipChangeCommitsを使用したアップストリーム変更コミットのスキップを参照してください。
startingTimestamp 利用可能な最新 2019-01-01T00:00:00.000Zなどのタイムスタンプ文字列または日付文字列 (例:2019-01-01 読み取りを開始するタイムスタンプ。 ストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。 使用可能なすべてのテーブルコミットの前にタイムスタンプがある場合、ストリームは使用可能な最も早いコミットから開始されます。 startingVersionと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。
startingVersion 利用可能な最新 正の整数、 0、または latest 読み取りを開始する差分テーブルのバージョン。 ストリームは、指定したバージョン以降にコミットされたすべての変更を読み取ります。 最新の変更からのみ開始する latest を指定します。 startingTimestampと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。 テーブル履歴の操作を参照してください。
withEventTimeOrder false truefalse 初期テーブル スナップショットをイベント時間バケットに分割して、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフル クエリで削除されないようにします。 チェックポイントを削除しないと、初期スナップショット処理が開始された後は変更できません。 Databricks Runtime 11.3 LTS 以降で使用できます。 「データを削除せずに初期スナップショットを処理する」を参照してください。

カフカ

spark.readStream.format("kafka")またはspark.read.format("kafka")で次のオプションを使用します。

Key デフォルト 有効な値 Description
assign None 次のような JSON 文字列 {"topicA":[0,1],"topicB":[2,4]} 使用する特定のパーティション。 subscribesubscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。
failOnDataLoss true truefalse 削除されたトピックやオフセットの切り捨てなどによってデータが失われた可能性がある場合にクエリを失敗させるかどうか。 不足しているデータをスキップして続行するには、 false に設定します。
Databricks は、データが失われた可能性があるかどうかを保守的に見積もります。 ただし、誤ったアラームが発生する可能性があります。
fetchoffset.numretries 3 正の整数または 0 Kafka オフセットのフェッチが失敗したときの再試行回数。
fetchoffset.retryintervalms 1000 正の整数または 0 オフセット フェッチの再試行間隔 (ミリ秒単位)。
groupIdPrefix spark-kafka-source (ストリーミング)、 spark-kafka-relation (バッチ) 任意の文字列 自動生成された Kafka コンシューマー グループ ID に使用するカスタマイズされたプレフィックス。 kafka.group.idが明示的に設定されている場合、コネクタはこのオプションを無視します。
kafka.group.id None 任意の文字列 読み取り時に使用する Kafka コンシューマー グループ ID。 使用には注意が必要です。同じグループ ID を共有するクエリは相互に干渉し、部分的なデータのみを読み取る可能性があります。 これは、同時実行バッチとストリーミング ワークロードを実行するとき、またはクエリをすばやく再起動するときに発生する可能性があります。 設定した場合、 groupIdPrefix は無視されます。 問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms を小さな値に設定します。
includeHeaders false truefalse Kafka メッセージ ヘッダーを列として出力に含めるかどうか。
kafkaconsumer.polltimeoutms None 正の整数 Kafka コンシューマー poll() 呼び出しのタイムアウト (ミリ秒)。
kafka.bootstrap.servers None host:port文字列のコンマ区切りリスト Kafka ブローカーの host:port アドレスのコンマ区切りのリスト。 Kafka クライアントの bootstrap.servers プロパティを設定します。
Kafka からのデータがない場合は、このブローカーのアドレス一覧で正しくないアドレスを確認してください。 ブローカーのアドレス一覧が正しくない場合は、エラーがない可能性があります。 Kafka クライアントは、ブローカーが最終的に使用可能になると想定し、ネットワーク エラーを受信したときに永久に再試行します。
maxRecordsPerPartition None 正の整数 各 Spark パーティションのレコードの最大数。 設定すると、コネクタは Kafka パーティションを分割して、各 Spark パーティションが最大でこの多くのレコードを読み取るようにします。
このオプションは、 minPartitionsで使用することもできます。 両方のオプションが設定されている場合、Spark はどちらのオプションを使用してもパーティションが増えます。
minPartitions None 正の整数 Kafka から読み取る Spark パーティションの最小数。 設定すると、コネクタは大きな Kafka パーティションを分割して並列処理を向上させます。 設定しない場合、Spark は Kafka トピック パーティションごとに 1 つのパーティションを作成します。 データ スキューやピーク時の負荷を処理する場合に便利です。
このオプションは、トリガーごとに Kafka コンシューマーを再初期化します。これは、SSL のパフォーマンスに影響する可能性があります。
startingOffsets latest (ストリーミング)、 earliest (バッチ) earliestlatest、または JSON オフセット文字列 クエリが読み取りを開始するオフセット。 JSON 文字列では、 -1 は最新のオフセットです。 -2 は最も早いオフセットです。 たとえば、 {"topicA":{"0":23,"1":-2}}と指定します。
ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。
バッチ クエリの場合、 latest は許可されません。
startingOffsetsByTimestamp None JSON タイムスタンプ文字列 (例: {"topicA":{"0":1000,"1":2000}} 各パーティションの開始オフセットのリスト。タイムスタンプとしてミリ秒単位で指定されます。 タイムスタンプのオフセットが存在しない場合、クエリの動作は startingOffsetsByTimestampStrategyによって決定されます。
ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。
startingOffsetsByTimestampStrategy error errorlatest startingOffsetsByTimestampまたはstartingTimestampで指定されたタイムスタンプのオフセットが見つからない場合に使用する戦略。 error は例外を発生させます。 latest では、使用可能な最新のオフセットが使用されます。
startingTimestamp None 正の整数または 0 すべてのパーティションに適用されるグローバル開始タイムスタンプ (ミリ秒単位)。 タイムスタンプのオフセットが存在しない場合、動作は startingOffsetsByTimestampStrategyによって制御されます。
subscribe None トピック名のコンマ区切りリスト サブスクライブするトピック。 subscribesubscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。
subscribePattern None Java正規表現文字列 トピックのサブスクライブに使用されるパターン。 subscribesubscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 たとえば、「 topic.* 」のように入力します。

次のオプションは、 spark.readStream.format("kafka")を使用したストリーミング読み取りにのみ適用されます。

Key デフォルト 有効な値 Description
bytesEstimateWindowLength 300s 10mや ɦ>などの期間文字列600s estimatedTotalBytesBehindLatest メトリックの残りのバイト数を見積もるために使用される時間枠。 詳しくは、Kafka メトリックを取得するをご覧ください。
maxOffsetsPerTrigger None 正の整数 トリガー間隔ごとに処理するオフセットの最大数。 オフセットは、トピック パーティション間で比例して分散されます。
maxTriggerDelay 15m 10mや ɦ>などの期間文字列600s トリガーする前に minOffsetsPerTrigger が蓄積されるまで待機する最大時間。
minOffsetsPerTrigger None 正の整数 マイクロバッチをトリガーする前に累積するオフセットの最小数。 maxTriggerDelayに達すると、マイクロバッチは関係なく実行されます。

spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されるオフセット オプションについては、「DataFrameReader Kafka オプション」を参照してください。

認証

Databricks では、Unity カタログ サービスの資格情報を使用して、クラウドで管理される Kafka サービス (AWS MSK、Azure Event Hubs、または Google Cloud Managed Kafka) に対する認証を行うことをお勧めします。

Key デフォルト 有効な値 Description
databricks.serviceCredential None 任意の文字列 クラウドで管理される Kafka サービスに対して認証するための Unity カタログ サービス 資格情報 の名前。 Databricks Runtime 16.1 以降で使用できます。
databricks.serviceCredential.scope None 任意の文字列 サービス資格情報の OAuth スコープ。 これは、Azure Databricksが Kafka サービスのスコープを自動的に推論できない場合にのみ設定します。

サービス資格情報を使用できない場合は、SASL/SSL オプション ( kafka.* プロパティとして渡されます) を使用します。 サービス資格情報を使用する場合、 kafka.sasl.mechanismkafka.sasl.jaas.config、または kafka.security.protocolを指定する必要はありません。

Key デフォルト 有効な値 Description
kafka.security.protocol None セキュリティ プロトコル文字列 ( SASL_SSLSSLPLAINTEXT ブローカー通信のセキュリティ プロトコル。
kafka.sasl.mechanism None SASL メカニズム文字列 ( PLAINSCRAM-SHA-256SCRAM-SHA-512OAUTHBEARERなど) AWS_MSK_IAM SASL メカニズム。
kafka.sasl.jaas.config None JAAS 構成文字列 JAAS ログイン構成文字列。
kafka.sasl.login.callback.handler.class None 完全修飾クラス名 SASL 認証のログイン コールバック ハンドラーの完全修飾クラス名。
kafka.sasl.client.callback.handler.class None 完全修飾クラス名 SASL 認証用のクライアント コールバック ハンドラーの完全修飾クラス名。
kafka.ssl.truststore.location None ファイル パス文字列 SSL 信頼ストア ファイルへのパス。
kafka.ssl.truststore.password None 任意の文字列 SSL 信頼ストア ファイルのパスワード。
kafka.ssl.keystore.location None ファイル パス文字列 SSL キー ストア ファイルへのパス。
kafka.ssl.keystore.password None 任意の文字列 SSL キー ストア ファイルのパスワード。

認証のセットアップ手順の詳細については、「 認証」を参照してください。

Pub/Sub

これらのオプションを spark.readStream.format("pubsub") と共に使用して、Google Pub/Sub にサブスクライブします。 オプション subscriptionIdtopicId、および projectId が必要です。

Key デフォルト 有効な値 Description
subscriptionId None 任意の文字列 必須。 Pub/Sub サブスクリプション ID。 サブスクリプションが存在しない場合は、コネクタによって作成されます。
topicId None 任意の文字列 必須。 Pub/Sub トピック ID。
projectId None 任意の文字列 必須。 Google Cloud プロジェクト ID。
numFetchPartitions ストリーム初期化時に使用可能な Executor の数の半分 正の整数 サブスクリプションから行をフェッチする並列 Spark タスクの数。
maxBytesPerTrigger None 正の整数 マイクロバッチごとに処理するバイト数のソフト制限。
maxRecordsPerFetch 1000 正の整数 処理前にタスクごとにフェッチする行数。
maxFetchPeriod 10s 1s1m 各タスクが行を処理する前に取得する時間間隔。 Azure Databricksでは、既定値を使用することをお勧めします。
deleteSubscriptionOnStreamStop false truefalse trueすると、ストリーミング クエリの終了時に、subscriptionIdのサブスクリプションが削除されます。
serviceCredential None 任意の文字列 Pub/Sub に対して認証するためのAzure Databricks サービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。
clientEmail None 電子メール アドレス文字列 Google サービス アカウントのメール アドレス。 サービス資格情報を使用しない場合に必要です。
clientId None 任意の文字列 Google Service Account のクライアント ID。 サービス資格情報を使用しない場合に必要です。
privateKey None 秘密キー文字列 Google サービス アカウントの秘密キー。 サービス資格情報を使用しない場合に必要です。
privateKeyId None 任意の文字列 Google サービス アカウントの秘密キー ID。 サービス資格情報を使用しない場合に必要です。

Pub/Sub の詳細については、「 Google Pub/Sub のサブスクライブ」を参照してください。

パルサー

Apache Pulsar からストリーミングするには、これらのオプションを spark.readStream.format("pulsar") と共に使用します。 Databricks Runtime 14.1 以降で使用できます。

次のオプションが必要です。 topictopics、またはtopicsPatternのいずれかを指定する必要があります。

Key デフォルト 有効な値 Description
service.url None Pulsar サービスの URL 文字列 Pulsar サービスの Pulsar serviceURL (例: pulsar://broker.example.com:6650)。
topic None 任意の文字列 使用する 1 つのトピック名。
topics None トピック名のコンマ区切りリスト 使用するトピック名のコンマ区切りのリスト。
topicsPattern None Java正規表現文字列 トピック名と一致するJava正規表現文字列。

次のオプションもサポートされています。

Key デフォルト 有効な値 Description
admin.url None URL 文字列 Pulsar 管理サービスの HTTP URL。 maxBytesPerTriggerが設定されている場合に必要です。
allowDifferentTopicSchemas false truefalse スキーマが異なる複数のトピックを読み取る場合は、このオプションを使用して、スキーマベースのトピック値の自動逆シリアル化を無効にします。 これが true の場合は、生の値のみが返されます。
failOnDataLoss true truefalse データが失われたときにクエリを失敗させるかどうか。 たとえば、アイテム保持ポリシーが原因でトピックが削除されたり、メッセージの有効期限が切れたりすると、データ損失が発生する可能性があります。
maxBytesPerTrigger None 正の整数 マイクロバッチごとに処理するバイト数のソフト制限。 admin.url が必要です。
pollTimeoutMs 120000 正の整数 Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。
predefinedSubscription None 任意の文字列 Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される定義済みのサブスクリプション名。
startingOffsets latest latestearliest、または JSON オフセット文字列 読み取りを開始する場所。
subscriptionPrefix None 任意の文字列 Spark アプリケーションの進行状況を追跡するランダム なサブスクリプションを生成するためにコネクタによって使用されるプレフィックス。
waitingForNonExistedTopic false truefalse コネクタが目的のトピックが作成されるまで待機するかどうか。

次のオプション パターンを使用して、追加の Pulsar クライアント、管理者、リーダーの構成を指定できます。

Pattern 構成オプション
pulsar.admin.* Pulsar 管理者構成
pulsar.client.* pulsar.client.authPluginClassNameなどの認証オプションを含む pulsar.client.authParams
pulsar.reader.* Pulsar リーダー構成

Pulsar クライアントと管理者認証オプションの詳細については、「 認証」を参照してください。

認証

Azure Databricks では、Pulsar に対する認証として、トラストストアとキーストアがサポートされています。 Azure Databricksでは、シークレットを使用して認証の詳細を格納することをお勧めします。 「シークレットの管理」を参照してください。

Key デフォルト 有効な値 Description
pulsar.client.authPluginClassName None 完全修飾クラス名 認証プラグインの完全修飾クラス名。 たとえば、「 org.apache.pulsar.client.impl.auth.AuthenticationTls 」のように入力します。
pulsar.client.authParams None 資格情報文字列 認証プラグインに文字列として渡される認証資格情報。 たとえば、「 tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem 」のように入力します。
pulsar.client.useKeyStoreTls false truefalse trueすると、PEM 形式のファイルではなく、KeyStore ベースの TLS 構成が有効になります。
pulsar.client.tlsTrustStoreType None 任意の文字列 TLS 信頼ストア ファイルの形式。 たとえば、「 JKS 」のように入力します。
pulsar.client.tlsTrustStorePath None ファイル パス文字列 信頼された CA 証明書を含む TLS 信頼ストア ファイルへのパス。 pulsar.client.useKeyStoreTlstrue の場合に必要です。
pulsar.client.tlsTrustStorePassword None 任意の文字列 TLS 信頼ストア ファイルのパスワード。

ストリームで PulsarAdminを使用する場合は、次のオプションを設定することもできます。

Key デフォルト 有効な値 Description
pulsar.admin.authPluginClassName None 完全修飾クラス名 Pulsar 管理クライアントの認証プラグインの完全修飾クラス名。
pulsar.admin.authParams None 資格情報文字列 Pulsar 管理者クライアント認証プラグインの認証資格情報。
pulsar.admin.useTls None truefalse Pulsar 管理者クライアント接続に TLS を使用するかどうか。
pulsar.admin.tlsAllowInsecureConnection None truefalse Pulsar 管理クライアントの安全でない TLS 接続を許可するかどうか。
pulsar.admin.tlsTrustCertsFilePath None ファイル パス文字列 Pulsar 管理クライアントの信頼された TLS 証明書ファイルへのパス。
pulsar.admin.useKeyStoreTls None truefalse Pulsar 管理クライアントに KeyStore ベースの TLS を使用するかどうか。
pulsar.admin.tlsTrustStoreType None 任意の文字列 Pulsar 管理クライアントの TLS 信頼ストアの形式。 たとえば、「 JKS 」のように入力します。
pulsar.admin.tlsTrustStorePath None ファイル パス文字列 Pulsar 管理クライアントの TLS 信頼ストア ファイルへのパス。 pulsar.admin.useKeyStoreTlstrue の場合に必要です。
pulsar.admin.tlsTrustStorePassword None 任意の文字列 Pulsar 管理者クライアント TLS 信頼ストアのパスワード。

認証の例については、「 Pulsar に対する認証」を参照してください。

DataFrameWriter オプション

これらのオプションを DataFrameWriter.option() および DataFrameWriterV2.option() と共に使用して、データAzure Databricks書き込む方法を制御します。

Example

次の例では、Delta Lake テーブルを書き込むためのmergeSchemaTrueを設定します。

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")

アブロ

Avro ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
avroSchema None JSON スキーマ文字列 JSON 文字列としての完全な Avro スキーマ。 Spark SQL 型を特定の Avro 型に変換するには、このオプションを使用します。 Avro ファイルの読み取りと書き込みに適用されます。
avroSchemaUrl None URL 文字列 Avro スキーマ ファイルを指す URL。 スキーマが外部に格納されている場合は、 avroSchema の代わりに使用します。 avroSchemaと相互に排他的です。 Avro ファイルの読み取りと書き込みに適用されます。
compression snappy uncompresseddeflatesnappy (default)bzip2xzzstandard 書き込み時に使用する圧縮コーデック。 Avro ファイルの読み取りと書き込みに適用されます。
recordName topLevelRecord 任意の文字列 出力 Avro スキーマの最上位レベルのレコード名。 Avro ファイルの読み取りと書き込みに適用されます。
positionalFieldMatching false truefalse Spark スキーマと Avro スキーマの間の列を名前ではなくフィールドの位置で照合するかどうか。 Avro ファイルの読み取りと書き込みに適用されます。
recordNamespace 空の文字列 任意の文字列 出力 Avro スキーマの最上位レコードの名前空間。 Avro ファイルの読み取りと書き込みに適用されます。

Delta Lake と Apache Iceberg

Delta Lake テーブルと Apache Iceberg テーブルを記述する場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
clusterByAuto false truefalse 自動液体クラスタリングを有効にするかどうか。Azure Databricksクエリ パターンに基づいてクラスタリング列が選択されます。 mode("overwrite")でのみ有効です。 append モードでは使用できません。 Databricks Runtime 16.4 以降で使用できます。 [テーブルに液体クラスタリングを使用する] に適用されます。
mergeSchema None truefalse 書き込み操作でスキーマの進化を有効にするかどうかを指定します。 ソース DataFrame の新しい列がターゲット テーブル スキーマに追加されます。 バッチおよびストリーミングの追加に適用されます。 スキーマ の進化に伴うテーブル スキーマの更新に適用されます。
overwriteSchema None truefalse 上書き時にテーブル スキーマとパーティション分割を置き換えるかどうか。 mode("overwrite")なしでreplaceWhereが必要です。 partitionOverwriteModeでは使用できません。 スキーマ の進化に伴うテーブル スキーマの更新に適用されます。
partitionOverwriteMode None staticdynamic パーティション上書きモード。 これを dynamic に設定すると、新しいデータを含むパーティションのみが上書きされ、他のすべてのパーティションは変更されません。 レガシ モード。サーバーレス コンピューティングまたは Databricks SQL ではサポートされていません。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
replaceOn None ブール式の文字列 ソース クエリの行に置き換えるターゲット テーブル内の行と一致するブール式。 ターゲット テーブルとソース クエリの両方の列を参照できます。 ソース行と一致するターゲット内の行は削除され、置き換えられます。 ソースが空の場合、削除は行われません。 列参照のあいまいさを解消するには、 targetAlias を使用します。 Databricks Runtime 17.1 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
replaceUsing None 列名のコンマ区切りリスト ターゲット テーブルとソース クエリの間の行を照合するために使用される列名のコンマ区切りのリスト。 ターゲットとソースの両方に、一覧表示されているすべての列が含まれている必要があります。 等値比較でソース行と一致するターゲット内の行は削除され、置き換えられます。 NULL 値は等しくないとして扱われ、一致しません。 Databricks Runtime 16.3 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
replaceWhere None 述語式の文字列 述語式。 述語に一致するレコードのみをアトミックに上書きします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
targetAlias None 任意の文字列 ターゲット テーブルの文字列エイリアス。 条件がターゲット テーブルとソース クエリの両方の列を参照する場合に、 replaceOn または replaceWhere を使用して列参照を明確にします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。
txnAppId None 任意の文字列 foreachBatch操作でのべき等書き込みのアプリケーションを識別する一意の文字列。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnVersion と共に使用します。 べき等テーブル書き込みにforeachBatchを使用するために適用されます。
txnVersion None 単調に増加する整数 foreachBatch操作でのべき等書き込みのトランザクション バージョンとして使用される単調に増加する数。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnAppId と共に使用します。 べき等テーブル書き込みにforeachBatchを使用するために適用されます。
optimizeWrite None truefalse この書き込み操作で自動最適化書き込みを有効にするかどうかを指定します。 spark.databricks.delta.optimizeWrite.enabled構成をオーバーライドします。 Azure Databricks?の Delta Lake に適用されます。
userMetadata None 任意の文字列 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。 DESCRIBE HISTORYの出力に表示されます。 カスタム メタデータを使用したテーブルのエンリッチに適用されます。

Csv

CSV ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
charToEscapeQuoteEscaping \0 (無効) 1 文字 エスケープ文字が引用符文字と異なる場合にエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。
compression none none (default)bzip2gziplz4snappydeflatezstd 書き込み時に使用する圧縮コーデック。 csv (DataFrameWriter) に適用されます。
dateFormat yyyy-MM-dd 日付書式指定文字列 日付列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。
emptyValue 空の文字列 任意の文字列 空の (null 以外の) 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。
encoding UTF-8 java.nio.charset.Charset 出力ファイルの文字エンコード。 csv (DataFrameWriter) に適用されます。
escape \ 1 文字 引用符で囲まれた値をエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。
escapeQuotes true truefalse 引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。 csv (DataFrameWriter) に適用されます。
header false truefalse 出力の最初の行として列名を書き込むかどうか。 csv (DataFrameWriter) に適用されます。
ignoreLeadingWhiteSpace false truefalse 書き込み時に先頭の空白を値からトリミングするかどうか。 csv (DataFrameWriter) に適用されます。
ignoreTrailingWhiteSpace false truefalse 書き込み時に値から末尾の空白をトリミングするかどうかを指定します。 csv (DataFrameWriter) に適用されます。
lineSep \n 文字列 レコード間で使用される行区切り文字列。 csv (DataFrameWriter) に適用されます。
locale en-US java.util.Locale識別子 java.util.Locale 識別子。 CSV 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与える、識別されたJavaロケール。
nullValue 空の文字列 任意の文字列 null 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。
quote " 1 文字 区切り記号を含むフィールド値を引用符で囲む文字。 csv (DataFrameWriter) に適用されます。
quoteAll false truefalse 内容に関係なく、すべてのフィールド値を引用符で囲むかどうか。 csv (DataFrameWriter) に適用されます。
sep , 文字列 フィールド区切り文字。 csv (DataFrameWriter) に適用されます。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式の文字列 タイムスタンプ列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。

Excel

Excel ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
dataAddress None シート名またはセル参照文字列 書き込みのシート名または開始セル。 省略した場合、セル Sheet1から始まるA1という名前のシートに書き込みます。 シート名 (SheetName) または単一セル参照 (SheetName!A1) を受け入れます。 セル範囲は書き込みではサポートされていません。
dateFormatInWrite yyyy-mm-dd Excel日付書式指定文字列 Excel c0 /< > 列に適用されるセル書式指定文字列です。 Excel書式構文を使用します。
headerRows 0 01 列名を最初の行として書き込むかどうか。
timestampNTZFormat yyyy-mm-dd hh:mm:ss Excel タイムスタンプ形式の文字列 Excel TimestampNTZ および Timestamp 列に適用されるセル書式指定文字列。 Excel書式構文を使用します。
version xlsx xlsxxls 書き込むExcelファイル形式のバージョン。

Json

JSON ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
compression none nonebzip2gziplz4snappydeflatezstd 書き込み時に使用する圧縮コーデック。 json (DataFrameWriter) に適用されます。
dateFormat yyyy-MM-dd 日付書式指定文字列 日付列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。
encoding UTF-8 java.nio.charset.Charset 出力ファイルの文字エンコード。 json (DataFrameWriter) に適用されます。
ignoreNullFields の値 spark.sql.jsonGenerator.ignoreNullFields truefalse JSON 出力から null 値を持つフィールドを省略するかどうか。 json (DataFrameWriter) に適用されます。
lineSep \n 文字列 レコード間で使用される行区切り文字列。 json (DataFrameWriter) に適用されます。
locale en-US java.util.Locale識別子 JSON 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与えるJavaロケール識別子。
pretty false truefalse 美しい (インデントされた複数行の) JSON 出力を有効にするかどうか。
sortKeys false truefalse 出力で JSON オブジェクトのキーをアルファベット順に並べ替えるかどうか。 確定的な出力を生成する場合に便利です。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式の文字列 タイムスタンプ列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。
writeNonAsciiCharacterAsCodePoint false truefalse 出力内のリテラル UTF-8 文字ではなく、非 ASCII 文字 \uXXXX Unicode エスケープ シーケンスとしてエンコードするかどうか。

オーク

ORC ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
compression zstd noneuncompressedsnappyzliblzozstdlz4brotli 書き込み時に使用する圧縮コーデック。 orc (DataFrameWriter) に適用されます。

寄木細工

Parquet ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
compression snappy noneuncompressedsnappygziplzobrotlilz4lz4_rawzstd 書き込み時に使用する圧縮コーデック。 Parquet (DataFrameWriter) に適用されます。
spark.sql.parquet.outputTimestampType INT96 INT96TIMESTAMP_MICROSTIMESTAMP_MILLIS タイムスタンプ列のエンコードに使用される物理型。 標準のタイムスタンプ型をサポートしていない従来の Parquet リーダーとの互換性を保つには、 INT96 を使用します。

テキスト

テキスト ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
compression none nonebzip2gziplz4snappydeflatezstd 書き込み時に使用する圧縮コーデック。 テキスト ( DataFrameWriter) に適用されます。
encoding UTF-8 java.nio.charset.Charset 出力ファイルの文字エンコード。
lineSep \n 文字列 レコード間で使用される行区切り文字列。 テキスト ( DataFrameWriter) に適用されます。

Xml

XML ファイルを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
arrayElementName item 任意の文字列 明示的な名前を持たない配列要素の要素名。 xml (DataFrameWriter) に適用されます。
attributePrefix _ 任意の文字列 XML 属性に対応するフィールド名の前に付加されるプレフィックス。 xml (DataFrameWriter) に適用されます。
compression none nonebzip2gziplz4snappydeflatezstd 書き込み時に使用する圧縮コーデック。 xml (DataFrameWriter) に適用されます。
dateFormat yyyy-MM-dd 日付書式指定文字列 日付列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。
declaration version="1.0" encoding="UTF-8" standalone="yes" XML 宣言文字列、または抑制する空の文字列 各出力ファイルの先頭に書き込まれた XML 宣言文字列。 宣言を抑制する空の文字列に設定します。 xml (DataFrameWriter) に適用されます。
encoding UTF-8 java.nio.charset.Charset 出力ファイルの文字エンコード。 xml (DataFrameWriter) に適用されます。
indent 4 スペース 任意の文字列 出力内の子要素のインデントに使用される文字列。 インデントをオフにし、各行を 1 行に書き込むには、空の文字列に設定します。
locale en-US java.util.Locale識別子 XML 内の既定の日付、タイムスタンプ、および 10 進形式に影響を与えるJavaロケール識別子。
nullValue null 任意の文字列 null 値に対して書き込まれた文字列。 nullに設定すると、null フィールドの属性と子要素は省略されます。 xml (DataFrameWriter) に適用されます。
rootTag ROWS 任意の文字列 出力内のすべての行要素をラップするルート要素タグ。 xml (DataFrameWriter) に適用されます。
rowTag ROW 任意の文字列 出力内の行を表す要素タグ。 xml (DataFrameWriter) に適用されます。
singleVariantColumn None 列名の文字列 XML ファイルに書き込む 1 つの Variant 列の名前。 xml (DataFrameWriter) に適用されます。
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式の文字列 タイムスタンプ列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムスタンプ形式の文字列 タイム ゾーン列の値を含まないタイムスタンプの書式指定文字列。 xml (DataFrameWriter) に適用されます。
validateName true truefalse 列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。 xml (DataFrameWriter) に適用されます。
valueTag _VALUE 任意の文字列 属性または子要素を持つ XML 要素の文字データに使用されるフィールド名。 xml (DataFrameWriter) に適用されます。

DataStreamWriter オプション

ストリーミング書き込みを構成するには、これらのオプションと DataStreamWriter.option() を使用します。

Example

次の例では、ストリームのチェックポイントの場所を設定します。

Python
(df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table"))
Scala
df.writeStream
  .format("delta")
  .option("checkpointLocation", "/path/to/checkpoint")
  .start("/path/to/table")

共通

次のオプションは、すべてのストリーミング書き込み操作に適用されます。

Key デフォルト 有効な値 Description
checkpointLocation なし (必須) パス文字列 ストリーミング クエリのチェックポイント ディレクトリへのパス。 フォールト トレランスと正確に 1 回の処理の保証に必要です。 各ストリーミング クエリでは、一意のチェックポイントの場所を使用する必要があります。 Databricks では、Unity カタログ ボリュームまたはクラウド ストレージ パスにチェックポイントを格納することをお勧めします。 「構造化ストリーミング チェックポイント」を参照してください。
path None パス文字列 Parquet などのファイル ベースのストリーミング シンクの出力パス。 ファイル ベースの形式にのみ適用されます。

コンソール シンク

コンソール シンクにストリームを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
numRows 20 正の整数 コンソール シンクに書き込むときにマイクロバッチごとに表示する行数。
truncate true truefalse 行を表示するときに長い文字列を切り捨てるかどうかを指定します。 完全な文字列値を表示するには、 false に設定します。

Delta Lake

format("delta")を使用して Delta Lake テーブルにストリームを書き込む場合は、次のオプションが適用されます。 overwriteSchemareplaceWherepartitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。

Key デフォルト 有効な値 Description
mergeSchema false truefalse ストリーミング DataFrame に新しい列が含まれている場合に Delta Lake テーブル スキーマを進化させるかどうか。 追加出力モードにのみ適用されます。 スキーマ の進化に伴うテーブル スキーマの更新に適用されます。
userMetadata None 任意の文字列 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。 DESCRIBE HISTORYの出力に表示されます。 カスタム メタデータを使用したテーブルのエンリッチに適用されます。

ファイル シンク

次のオプションは、ストリームをファイル ベースの形式 (Parquet、JSON、CSV、ORC、text) に書き込む場合に適用されます。 形式固有のオプションについては、「 DataFrameWriter オプション」を参照してください。

Key デフォルト 有効な値 Description
retention None 7 days24 hours フォールト トレランスと圧縮に使用されるシンク メタデータ ファイルを保持する期間。 設定しない場合、メタデータ ファイルは無期限に保持されます。

Kafka シンク

Kafka に書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
kafka.bootstrap.servers None host:port文字列のコンマ区切りリスト 必須。 Kafka ブローカー host:port アドレスのコンマ区切りのリスト。
topic None 任意の文字列 すべての行のターゲット Kafka トピック。 DataFrame に topic 列が含まれていない場合は必須です。
kafka.* None Kafka プロデューサーの構成 プレフィックスが付いた kafka.。 たとえば、「 kafka.compression.type 」のように入力します。

メモリ シンク

メモリ シンクにストリームを書き込む場合は、次のオプションが適用されます。

Key デフォルト 有効な値 Description
queryName なし (必須) 任意の文字列 クエリが書き込むメモリ内テーブルの名前。 メモリ シンクに必要です。 .queryName()を介して構成することもできます。
mode exactlyonce exactlyonceatleastonce メモリ シンクの配信保証。 exactlyonce は、厳密に 1 回のセマンティクスを持つマイクロバッチ モードを使用します。 atleastonce は、少なくとも 1 回のセマンティクスを持つ連続モードを使用します。

Spark 関数のオプション

一部の Spark SQL 組み込み関数は、解析またはシリアル化の動作を制御する options マップを受け入れます。 オプションを Python dict または Scala Map[String, String] として渡します。

Example

次の例では、形式が正しくないレコードを削除しながら JSON 列を解析します。

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))

アブロ

Avro 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、スキーマの進化が有効になっている Avro 列をデコードします。

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro

val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))

さらに、 from_avroto_avro のスキーマ レジストリバリアントでは、次のオプションを受け入れます。

Key デフォルト 有効な値 Description
schemaId None スキーマ ID の整数 jsonFormatSchemaと互換性のないスキーマでエンコードされた Avro データをデコードするときに使用する Confluent スキーマ レジストリのスキーマ ID。 from_avroにのみ適用されます。
confluent.schema.registry.* None Confluent SR クライアント プロパティ値 Confluent Schema Registry クライアント構成プロパティ。 基本認証資格情報の confluent.schema.registry.basic.auth.user.info など、このプレフィックスを使用して Confluent SR クライアント プロパティを渡します。 from_avroto_avroのスキーマ レジストリバリアントに必要です。

Csv

CSV 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、カスタム区切り記号と NULL 値を含む CSV を読み取ります。

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._

val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))

Json

JSON 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、 NULL フィールドが無視され、整形書式設定が有効になっている JSON を書き込みます。

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json

val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))

Protobuf

from_protobufto_protobuf はファイルベースのデータソースを使用しません。 Protobuf データは、これらの関数を使用して常にバイナリ列として読み書きされます。 オプションは Map[String, String] として渡され、大文字と小文字が区別されます。

Example

次の例では、PERMISSIVE モードを使用して Protobuf 列をデコードします。

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
    {"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf

val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
    Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))

Protobuf 関数では、次のオプションを使用します。

Key デフォルト 有効な値 Description
mode FAILFAST FAILFASTPERMISSIVE 破損したレコードを処理する方法。 FAILFAST で例外がスローされました。 PERMISSIVE は、形式が正しくないフィールドを null に設定します。 from_protobufに適用されます。
recursive.fields.max.depth -1 (無効) 0 から 10 に変更します 再帰 Protobuf フィールドの最大再帰深度。 再帰フィールドのサポートをオフにするには、 0 に設定します。 from_protobufに適用されます。
convert.any.fields.to.json false truefalse Anyではなく、Protobuf STRUCTフィールドを JSON 文字列に変換するかどうかを指定します。 from_protobufに適用されます。
emit.default.values false truefalse ゼロまたは既定値 (proto3 セマンティクス) を持つフィールドを出力するかどうか。 falseすると、既定値のフィールドは出力から省略されます。 from_protobufに適用されます。
enums.as.ints false truefalse 列挙型フィールドを文字列の代わりに整数値としてレンダリングするかどうかを指定します。 from_protobufに適用されます。
upcast.unsigned.ints false truefalse 整数オーバーフローを防ぐためにuint32Longにアップキャストし、uint64Decimal(20,0)するかどうか。 from_protobufに適用されます。
unwrap.primitive.wrapper.types false truefalse google.protobufラッパー型 (Int32ValueStringValueなど) を対応するプリミティブ Spark 型にラップ解除するかどうか。 from_protobufに適用されます。
retain.empty.message.types false truefalse ダミー列を挿入して、空の Protobuf メッセージ型を出力スキーマに保持するかどうかを指定します。 from_protobufに適用されます。
schema.registry.subject None 任意の文字列 スキーマ レジストリのサブジェクト名。 from_protobufto_protobufのスキーマ レジストリバリアントを使用する場合に必要です。
schema.registry.address None host:port文字列 スキーマ レジストリ アドレス (ホストとポート)。 from_protobufto_protobufのスキーマ レジストリバリアントを使用する場合に必要です。
schema.registry.protobuf.name None 任意の文字列 スキーマ レジストリのサブジェクトに複数のメッセージが含まれている場合に使用する Protobuf メッセージを指定します。 オプション。
schema.registry.schema.evolution.mode "restart" "restart""none" 受信レコードで新しいスキーマ ID が検出された場合のスキーマ変更の処理方法。 "restart" は、 UnknownFieldExceptionを使用してクエリを終了します。変更を取得できない場合に再開するようにジョブを構成します。 "none" はスキーマ ID の変更を無視し、新しいレコードを元のスキーマで解析します。
confluent.schema.registry.<option> 任意の有効な Confluent Schema Registry クライアント オプション値 プレフィックス を使用して"confluent.schema.registry" オプションを渡します。 たとえば、 "confluent.schema.registry.basic.auth.credentials.source""USER_INFO" に設定し、 "confluent.schema.registry.basic.auth.user.info""<KEY>:<SECRET>" に設定して基本認証を構成します。

Xml

XML 関数は、対応する DataFrame オプションと同じオプションを受け入れます。

Example

次の例では、カスタム ルートタグと行タグを使用して XML を書き込みます。

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml

val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))