このページでは、データの読み取りと書き込みを行う Spark API で使用できる入力オプションと出力オプションの一覧を示します。
DataFrameReader オプション
これらのオプションは、DataFrameReader.option()、DataFrameReader.options()、read_files、COPY INTO、および Auto Loader と共に使用Azure Databricksデータ ファイルの読み取り方法を制御します。
Example
次の例では、JSON ファイルを読み取るためのmultiLineにTrueを設定します。
Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
Scala
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SQL
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
共通
次のオプションは、すべてのファイル形式に適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
ignoreCorruptFiles |
false |
true、false |
破損したファイルを無視するかどうか。 true の場合、破損したファイルが検出されても Spark ジョブは引き続き実行され、読み取られた内容は引き続き返されます。
COPY INTOでは、Delta Lake 履歴のnumSkippedCorruptFiles列にoperationMetricsとして、スキップされた破損したファイルを観察できます。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
ignoreMissingFiles |
false自動ローダーの場合は、trueのCOPY INTO (レガシ) |
true、false |
行方不明のファイルを無視するかどうかを指定します。 true の場合、不足しているファイルが検出されても Spark ジョブは引き続き実行され、内容は引き続き返されます。 Databricks Runtime 11.3 LTS 以降で使用できます。 |
modifiedAfter |
None | タイムスタンプ文字列 | 指定したタイムスタンプの後に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。 |
modifiedBefore |
None | タイムスタンプ文字列 | 指定したタイムスタンプの前に変更タイムスタンプを持つファイルのみを取り込むためのフィルターとしてのオプションのタイムスタンプ。 |
pathGlobFilter または fileNamePattern |
None | glob パターン文字列 | ファイルを選択するための可能性のある glob パターン。
PATTERN (レガシ) でのCOPY INTOに相当します。
fileNamePattern では read_files を使用できます。 |
recursiveFileLookup |
false |
true、false |
true場合、このオプションは、名前が date=2019-07-01 のようなパーティションの名前付けスキームに従っていない場合でも、入れ子になったディレクトリを検索します。 |
アブロ
Avro ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
avroSchema |
None | Avro スキーマ文字列 | Avro 形式のユーザーによって指定された省略可能なスキーマ。 Avro を読み取る場合、このオプションは互換性があるが、実際の Avro スキーマとは異なる進化したスキーマに設定できます。 逆シリアル化スキーマは、進化したスキーマと一致します。 たとえば、既定値を持つ追加の列を 1 つ含む進化したスキーマを設定した場合、読み取り結果にも新しい列が含まれます。 |
avroSchemaEvolutionMode |
none |
none、restart |
スキーマ レジストリを使用するときにスキーマの進化を処理する方法。
none はスキーマの変更を無視し、ジョブを続行します。
restart では、スキーマの変更が検出され、ジョブの再起動が必要な場合に UnknownFieldException が発生します。 |
datetimeRebaseMode |
LEGACY |
EXCEPTION、LEGACY、CORRECTED |
ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。 |
enableStableIdentifiersForUnionType |
false |
true、false |
Avro 共用体型に安定したフィールド名を使用するかどうか。 有効にすると、共用体の型フィールド名は、小文字の型名から派生します (たとえば、 member_int、 member_string)。 2 つの型名が小文字の後で同一の場合、例外をスローします。 |
mergeSchema |
false |
true、false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 Avro に対して mergeSchema を有効にしても、データ型は緩和されません。 |
mode |
FAILFAST |
FAILFAST、PERMISSIVE、DROPMALFORMED |
破損したレコードを処理するためのパーサー モード。
FAILFAST で例外がスローされました。
PERMISSIVE は、形式が正しくないフィールドを null に設定します。
DROPMALFORMED 不正なレコードが自動的に削除されます。 |
readerCaseSensitive |
true |
true、false |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
recursiveFieldMaxDepth |
None |
0 から 15 に変更します |
再帰 Avro フィールドの最大再帰深度。 すべての再帰フィールドを切り捨てる 1 に設定し、1 レベルの再帰を許可する 2 など、最大 15に設定します。 設定解除または 0場合、再帰フィールドは許可されません。 |
rescuedDataColumn |
None | 列名の文字列 | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。詳細については、「復旧されたデータ列とは」を参照してください。 |
stableIdentifierPrefixForUnionType |
member_ |
任意の文字列 |
enableStableIdentifiersForUnionType=trueするときに、安定した共用体型のフィールド名に使用するプレフィックス。 |
Csv
CSV ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
badRecordsPath |
None | パス文字列 | 不正な CSV レコードに関する情報を記録するためのファイルを格納するパス。 |
charToEscapeQuoteEscaping |
\0 |
1 文字 | 引用符のエスケープに使用する文字をエスケープするために使用する文字。 たとえば、レコードが [ " a\\", b ] の場合は次のようになります。
|
columnNameOfCorruptRecord |
_corrupt_record |
列名の文字列 | 自動ローダーがサポートされています。
COPY INTO (レガシ) ではサポートされていません。形式に誤りがあり、解析できないレコードを格納するための列。 解析の mode を DROPMALFORMED に設定する場合、この列は空になります。 |
comment |
\0 |
1 文字 | テキスト行の先頭に配置した場合に行コメントを表す文字を定義します。 コメントのスキップを無効にするには、'\0' を使用します。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 | 日付文字列を解析するための形式。 |
emptyValue |
空の文字列 | 任意の文字列 | 空の値の文字列表現。 |
enableDateTimeParsingFallback |
false |
true、false |
指定した形式で値を解析できない場合に、従来の日付とタイムスタンプの解析動作にフォールバックするかどうかを指定します。
falseすると、解析エラーが発生するか、modeに応じて null が生成されます。 |
encoding または charset |
UTF-8 |
java.nio.charset.Charset名 |
CSV ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。
UTF-16 が UTF-32 の場合、multiline と true を使用することはできません。 |
enforceSchema |
true |
true、false |
指定または推論されたスキーマを CSV ファイルに強制的に適用するかどうか。 このオプションを有効にすると、CSV ファイルのヘッダーは無視されます。 自動ローダーを使用してデータをレスキューし、スキーマの展開を許可する場合、このオプションは既定では無視されます。 |
escape |
\ |
1 文字 | データの解析時に使用するエスケープ文字。 |
extension |
csv |
ファイル拡張子文字列 | 読み取りに必要なファイル名拡張子。 この拡張子のないファイルは除外されます。 |
failOnUnknownFields |
false |
true、false |
CSV レコードにスキーマに存在しない列が含まれている場合に失敗するかどうか。
falseすると、認識されない列は、rescuedDataColumnに応じて自動的に削除または救助されます。 |
failOnWidenedFields |
false |
true、false |
フィールド値を、拡大せずに宣言されたスキーマ型として解析できない場合に失敗するかどうか。
falseすると、rescuedDataColumnに応じて、型が拡大された値が自動的に復旧されます。
failOnUnknownFields=true設定すると、このオプションの効果をマスクできます。 |
header |
false |
true、false |
CSV ファイルにヘッダーが含まれているかどうか。 自動ローダーによって、スキーマの推論時にファイルにヘッダーが含まれているものと見なされます。 |
ignoreLeadingWhiteSpace |
false |
true、false |
解析対象の各値の先頭の空白文字を無視するかどうか。 |
ignoreTrailingWhiteSpace |
false |
true、false |
解析対象の各値の末尾の空白文字を無視するかどうか。 |
inferSchema |
false |
true、false |
解析対象の CSV レコードのデータ型を推論するか、すべての列が StringType であると見なすか。
true に設定した場合は、追加でデータを渡す必要があります。 自動ローダーの場合は、代わりに cloudFiles.inferColumnTypes を使います。 |
inputBufferSize |
1048576 (1 MB) |
正の整数 | CSV パーサーのバッファー サイズ (バイト単位)。 大きな CSV ファイルを解析するときにメモリ使用量を調整する場合に便利です。 |
lineSep |
なし。 \r、 \r\n、および \n |
文字列 | 連続する 2 つの CSV レコードの間の文字列。 |
locale |
US |
java.util.Locale識別子 |
CSV 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与える、識別されたJavaロケール。 |
maxCharsPerColumn |
-1 |
正の整数、または無制限の-1 |
解析する値の予想最大文字数。 メモリ エラーを回避するために使用できます。 既定値は -1 で、無制限を意味します。 |
maxColumns |
20480 |
正の整数 | レコードに含めることができる列数のハード制限。 |
mergeSchema |
false |
true、false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 スキーマの推論時に、自動ローダーに対して既定で有効になります。 |
mode |
PERMISSIVE |
PERMISSIVE、DROPMALFORMED、FAILFAST |
形式に誤りがあるレコードの処理に関するパーサーのモード。 |
multiLine |
false |
true、false |
CSV レコードが複数の行にまたがるかどうか。 |
nanValue |
NaN |
任意の文字列 |
FloatType および DoubleType 列を解析する際の非数値の文字列表現。 |
negativeInf |
-Inf |
任意の文字列 |
FloatType または DoubleType 列を解析する際の負の無限大の文字列表現。 |
nullValue |
空の文字列 | 任意の文字列 | null 値の文字列表現。 |
parserCaseSensitive (非推奨) |
false |
true、false |
ファイルの読み取り中に、ヘッダーに宣言されている列をスキーマの大文字と小文字の区別に合わせるかどうか。 自動ローダーについては、これは既定で true となります。 有効にすると、大文字と小文字が異なる列は rescuedDataColumn に救出されます。
readerCaseSensitive が優先されるため、このオプションは非推奨となりました。 |
positiveInf |
Inf |
任意の文字列 |
FloatType または DoubleType 列を解析する際の正の無限大の文字列表現。 |
preferDate |
true |
true、false |
可能な場合、タイムスタンプではなく日付として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。 |
quote |
" |
1 文字 | フィールド区切り記号が値に含まれる場合に、値のエスケープに使用する文字。 |
readerCaseSensitive |
true |
true、false |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
rescuedDataColumn |
None | 列名の文字列 | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
sep または delimiter |
, |
文字列 | 列の間の区切り文字列。 |
singleVariantColumn |
None | 列名の文字列 | 列名に設定すると、各フィールドを独自の列に解析するのではなく、CSV レコード全体をその名前の単一の VariantType 列に読み取ります。
header=true が必要です。 |
skipRows |
0 |
正の整数または 0 |
コメントされた行や空の行を含む、無視する必要がある CSV ファイルの先頭からの行数。
header が true の場合、ヘッダーは最初にスキップされていない行とコメントされていない行になります。 |
timeFormat |
HH:mm:ss |
時刻書式指定文字列 | 列の値 TimeType 解析するための形式。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ形式の文字列 | タイムスタンプ文字列を解析するための形式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。 |
timeZone |
None |
java.time.ZoneId文字列 |
タイムスタンプと日付を解析するときに使用する java.time.ZoneId。 |
unescapedQuoteHandling |
STOP_AT_DELIMITER |
STOP_AT_CLOSING_QUOTE、BACK_TO_DELIMITER、STOP_AT_DELIMITER、SKIP_VALUE、RAISE_ERROR |
エスケープされていない引用符を処理するための方策。 許可される各オプションの動作は次のとおりです。
|
Excel
Excelファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
dataAddress |
None | セル範囲またはシート名の文字列 | Excel構文で読み取るセル範囲。 省略した場合は、最初のシートからすべての有効なセルを読み取ります。
SheetName!C5:H10を使用して、名前付きシートから範囲を読み取るか、最初のシートから範囲を読み取C5:H10するか、特定のシートからすべてのデータを読み取SheetNameします。 |
headerRows |
0 |
0、1 |
列名ヘッダーとして使用する初期行の数。
dataAddressを指定すると、セル範囲内に適用されます。
0すると、列名は_c1、_c2、_c3などとして自動生成されます。 |
ignoreMissingSheet |
false |
true、false |
dataAddressで指定されたシートを含まないファイルをサイレント スキップするかどうかを指定します。
falseすると、ファイルに要求されたシートがない場合にエラーがスローされます。 シート名が dataAddressで指定されている場合にのみ適用されます。 |
includePhoneticRuns |
false |
true、false |
XLSX ファイルを読み取るときに、セル文字列値に連結されたふりがな (pinyin やふりがななど) 発音注釈を含めるかどうか。 |
operation |
readSheet |
readSheet、listSheets |
Excel ブックに対して実行する操作。
readSheet はシートからデータを読み取ります。
listSheets は、各シートのフィールド sheetIndex: long および sheetName: String を持つ構造体を返します。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | Excelに文字列として格納されるタイムスタンプなしのタイムゾーン値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 |
Dateとして読み取られた文字列値のカスタム書式指定文字列。 カスタム日付形式は Datetime パターンの形式に従います。 |
Json
JSON ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
allowBackslashEscapingAnyCharacter |
false |
true、false |
バックスラッシュを使用して、後続の任意の 1 文字をエスケープすることを許可するかどうか。 有効にしない場合は、JSON の仕様に明示されている文字のみをエスケープできます。 |
allowComments |
false |
true、false |
解析対象のコンテンツ内で Java、C、および C++ スタイルのコメント ('/'、'*'、および '//' の種類) の使用を許可するかどうか。 |
allowNonNumericNumbers |
true |
true、false |
非数値 (NaN) トークンのセットを有効な浮動小数点数値として許可するかどうか。 |
allowNumericLeadingZeros |
false |
true、false |
追加の (無視できる) ゼロで始まる整数値を許可するかどうか (例: 000001)。 |
allowSingleQuotes |
true |
true、false |
単一引用符 (アポストロフィ、'\' 文字) を使用して、文字列 (名前と文字列値) を囲むことを許可するかどうか。 |
allowUnquotedControlChars |
false |
true、false |
JSON 文字列に、エスケープされていない制御文字 (タブや改行文字など、値が 32 未満の ASCII 文字) を含めることを許可するかどうか。 |
allowUnquotedFieldNames |
false |
true、false |
引用符で囲まれていないフィールド名の使用を許可するかどうか。これは JavaScript では許可されますが、JSON 仕様では許可されません。 |
alternateVariantEncoding |
None | Z85 |
ソース JSON の Variant 値に使用されるエンコード。 インライン JSON として格納されるのではなく、Base85 でエンコードされた Variant 値をデコードするには、 Z85 に設定します。 |
badRecordsPath |
None | パス文字列 | 不正な JSON レコードに関する情報を記録するためのファイルを格納するパス。 ファイル ベースのデータ ソースで badRecordsPath オプションを使用する場合、次の制限があります。
|
columnNameOfCorruptRecord |
_corrupt_record |
列名の文字列 | 形式に誤りがあり、解析できないレコードを格納するための列。 解析の mode を DROPMALFORMED に設定する場合、この列は空になります。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 | 日付文字列を解析するための形式。 |
dropFieldIfAllNull |
false |
true、false |
スキーマの推論中に、すべて null 値の列または空の配列および構造体を無視するかどうか。 |
encoding または charset |
UTF-8 |
java.nio.charset.Charset名 |
JSON ファイルのエンコードの名前。 オプションの一覧については、java.nio.charset.Charset を参照してください。
UTF-16 が UTF-32 の場合、multiline と true を使用することはできません。 |
inferTimestamp |
false |
true、false |
タイムスタンプ文字列を TimestampType として推論を試みるかどうか。
trueに設定すると、スキーマの推論に著しく長い時間がかかる場合があります。 自動ローダーで使うには cloudFiles.inferColumnTypes を有効にする必要があります。 |
lineSep |
なし。 \r、 \r\n、および \n |
文字列 | 連続する 2 つの JSON レコードの間の文字列。 |
locale |
US |
java.util.Locale識別子 |
JSON 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与えるJavaロケール識別子。 |
maxNestingDepth |
500 |
正の整数 | JSON オブジェクトと配列で許容される入れ子の深さの最大値。 深く入れ子になったドキュメントの場合は、この値を大きくします。 |
maxNumLen |
1000 |
正の整数 | JSON 入力内のトークン数の最大長。 大きな数値リテラルを含む JSON の場合は、この値を増やします。 |
maxStringLen |
無制限 | 正の整数 | JSON 入力内の文字列値の最大長。 大きな文字列で JSON を解析するときのメモリ使用量を制限するように設定します。 |
mode |
PERMISSIVE |
PERMISSIVE、DROPMALFORMED、FAILFAST |
形式に誤りがあるレコードの処理に関するパーサーのモード。 |
multiLine |
false |
true、false |
JSON レコードが複数の行にまたがるかどうか。 |
prefersDecimal |
false |
true、false |
可能な場合は float 型や double 型の代わりに DecimalType として文字列を推論しようとします。 また、 inferSchema を有効にするか、自動ローダーで cloudFiles.inferColumnTypes を使用して、スキーマ推論を使用する必要があります。 |
primitivesAsString |
false |
true、false |
数値やブール値などのプリミティブ型を StringType として推論するかどうか。 |
readerCaseSensitive |
true |
true、false |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 Databricks Runtime 13.3 以降で使用できます。 |
rescuedDataColumn |
None | 列名の文字列 | データ型の不一致またはスキーマの不一致 (列の大文字と小文字の区別を含む) によって解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
singleVariantColumn |
None | 列名の文字列 | JSON ドキュメント全体を取り込むかどうか。列の名前として指定された文字列を持つ単一の Variant 列に解析されます。 設定されていない場合、JSON フィールドは独自の列に取り込まれます。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ形式の文字列 | タイムスタンプ文字列を解析するための形式。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | タイム ゾーン (TimestampNTZType) 文字列を含まないタイムスタンプを解析するための形式。 |
timeZone |
None |
java.time.ZoneId文字列 |
タイムスタンプと日付を解析するときに使用する java.time.ZoneId。 |
upgradeExceptionAsBadRecord |
false |
true、false |
型アップグレード例外 (たとえば、宣言された列型に値を拡大できない場合) を、例外をスローするのではなく、無効なレコードとして扱うかどうか。 |
カフカ
Kafka リーダー オプションの完全な一覧については、「 DataStreamReader Kafka オプション」を参照してください。 次のオプションは、 spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
endingOffsets |
latest |
latest、または JSON オフセット文字列 |
読み取りを停止する場所。 JSON 文字列では、 -1 は最新のオフセットです。
-2(最も早いオフセット) は、終了オフセットとして使用できません。 JSON オフセット文字列の例を次に示します: {"topicA":{"0":50,"1":-1}}。 |
endingOffsetsByTimestamp |
None | JSON タイムスタンプ文字列 | タイムスタンプとして指定されたパーティションごとの終了オフセット (ミリ秒単位)。 たとえば、 {"topicA":{"0":2000,"1":3000}}と指定します。 |
endingTimestamp |
None | 正の整数または 0 |
すべてのパーティションに適用されるグローバル終了タイムスタンプ (ミリ秒単位)。 |
オーク
ORC ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
mergeSchema |
false |
true、false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 |
寄木細工
Parquet ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
datetimeRebaseMode |
LEGACY |
EXCEPTION、LEGACY、CORRECTED |
ユリウス暦と予期的グレゴリオ暦の間の日付値とタイムスタンプ値のリベースを制御します。 |
int96RebaseMode |
LEGACY |
EXCEPTION、LEGACY、CORRECTED |
ユリウス暦と予期的グレゴリオ暦の間の INT96 タイムスタンプ値のリベースを制御します。 |
mergeSchema |
false |
true、false |
複数のファイル全体でスキーマを推論するか、各ファイルのスキーマをマージするかどうか。 |
readerCaseSensitive |
true |
true、false |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
rescuedDataColumn |
None | 列名の文字列 | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) が原因で解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
状態ストア
構造化ストリーミング状態データを読み取るために、 spark.read.format("statestore") またはテーブル値関数 read_statestore これらのオプションを使用します。 「構造化ストリーミング状態情報の読み取り」をご覧ください。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
batchId |
最新のバッチ ID | 正の整数または 0 |
読み取り対象のバッチ。 クエリの以前の状態を照会するために使用します。 バッチはコミットされるべきですが、まだクリーンアップされていません。 |
operatorId |
0 |
正の整数または 0 |
読み取り対象の演算子。 クエリに複数のステートフル演算子がある場合に使用します。 |
storeName |
DEFAULT |
任意の文字列 | 読み取り対象の状態ストア名。 ステートフル 演算子に複数の状態ストア インスタンスがある場合に使用します。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。 |
joinSide |
None |
left、right |
ストリーム ストリーム結合の読み取り先となるターゲット側。 ストリーム ストリーム結合には storeName または joinSide を指定する必要がありますが、両方を指定する必要はありません。 |
snapshotStartBatchId |
None | 正の整数または 0 |
状態を読み取るときに開始点として使用するスナップショットのバッチ ID。 リーダーは、このスナップショットからの変更を batchIdまで再生することで、状態を再構築します。 スナップショットが破損している場合に便利です。
snapshotPartitionIdと共に指定する必要があります。
readChangeFeedでは使用できません。 変更ログのチェックポイント処理が有効になっている HDFS ベースの状態ストアと RocksDB 状態ストアをサポートします。 Databricks Runtime 15.4 LTS 以降で使用できます。 |
snapshotPartitionId |
None | 正の整数または 0 |
指定した場合、クエリはこのパーティションのみを読み取ります。
snapshotStartBatchIdと共に指定する必要があります。
readChangeFeedでは使用できません。 Databricks Runtime 15.4 LTS 以降で使用できます。 |
readChangeFeed |
false |
true、false |
trueすると、changeStartBatchIdとchangeEndBatchIdの間のバッチの指定された範囲にわたって状態の変化を返します。
changeStartBatchId が必要です。
joinSide、batchId、snapshotStartBatchId、またはsnapshotPartitionIdでは使用できません。 Databricks Runtime 16.4 LTS 以降で使用できます。詳細については、「 構造化ストリーミング状態の変更の読み取り」を参照してください。 |
changeStartBatchId |
None | 正の整数または 0 |
変更フィード範囲の開始バッチ ID。
readChangeFeed が true の場合に必要です。
readChangeFeedが true に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 |
changeEndBatchId |
最新のバッチ ID | 正の整数または 0 |
変更フィード範囲の終了バッチ ID。
changeStartBatchId以上である必要があります。
readChangeFeedが true に設定されている場合にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 |
stateVarName |
None | 任意の文字列 | 読み取る状態変数名。 状態変数名は、init 演算子によって使用されるStatefulProcessorのtransformWithState関数内の各変数の一意の名前です。
transformWithState演算子を使用する場合は必須です。 Databricks Runtime 16.4 LTS 以降で使用できます。 |
readRegisteredTimers |
false |
true、false |
trueすると、transformWithState演算子によって使用される登録済みタイマーを読み取ります。
transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 |
flattenCollectionTypes |
true |
true、false |
trueすると、マップとリストの状態変数に対して返されるレコードがフラット化されます。
falseすると、Spark SQL ArrayまたはMapとしてレコードが返されます。
transformWithState演算子にのみ適用されます。 Databricks Runtime 16.4 LTS 以降で使用できます。 |
テキスト
テキスト ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
encoding |
UTF-8 |
java.nio.charset.Charset名 |
テキスト ファイルの行区切り記号のエンコードの名前。 ファイルの内容はこのオプションの影響を受けず、as-is読み取られます。 |
lineSep |
なし。 \r、 \r\n 、および \n |
文字列 | 連続する 2 つのテキスト レコード間の文字列。 |
wholeText |
false |
true、false |
ファイルを単一レコードとして読み取るかどうか。 |
Xml
XML ファイルを読み取る場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
rowTag |
None | 任意の文字列 | 行として扱う XML ファイルの行タグ。 XML <book> <page><page>...<book> の例では、適切な値は page です。 これは必須オプションです。 |
samplingRatio |
1.0 |
0.0 から 1.0 に変更します |
スキーマ推論に使用される行の割合を定義します。 XML 組み込み関数はこのオプションを無視します。 |
excludeAttribute |
false |
true、false |
要素内の属性を除外するかどうか。 |
mode |
None |
PERMISSIVE、DROPMALFORMED、FAILFAST |
解析中に破損したレコードを処理するモードを許可します。
|
inferSchema |
true |
true、false |
true の場合は、結果として得られる各データフレーム列に対して適切な型を推論しようとします。
false の場合、結果の列はすべて string 型です。 XML 組み込み関数はこのオプションを無視します。 |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
列名の文字列 |
PERMISSIVE モードで作成された形式の正しくない文字列を含む新しいフィールドの名前を変更できます。 |
attributePrefix |
None | 任意の文字列 | 属性と要素を区別するための属性のプレフィックス。 これはフィールド名のプレフィックスになります。 既定値は _ です。 XML の読み取り時は空にすることができますが、書き込み時は空にすることはできません。
DataFrameWriter XML オプションにも適用されます。 |
valueTag |
_VALUE |
任意の文字列 | 属性または子要素の要素も持つ要素内の文字データに使用されるタグ。 ユーザーがスキーマで valueTag フィールドを指定することもできますが、文字データが他の要素や属性と一緒に要素に存在する場合、スキーマ推論中に自動的に追加されます。
DataFrameWriter XML オプションにも適用されます。 |
encoding |
UTF-8 |
java.nio.charset.Charset名 |
読み取りの場合は、指定されたエンコードの種類で XML ファイルをデコードします。 書き込みの場合は、保存される XML ファイルのエンコード (文字セット) を指定します。 XML 組み込み関数はこのオプションを無視します。 DataFrameWriter XML オプションにも適用されます。 |
ignoreSurroundingSpaces |
true |
true、false |
値を囲む空白をスキップする必要があるかどうか。 空白のみの文字データは無視されます。 |
rowValidationXSDPath |
None | ファイル パス文字列 | 各行の省略可能な XML を個別に検証するために使用される XSD ファイルへのパス。 検証に失敗した行は、解析エラーのように扱われます。 XSD は、指定されているか推論されたかに関係なく、スキーマには影響しません。 |
ignoreNamespace |
false |
true、false |
true場合、XML 要素と属性に対する名前空間のプレフィックスは無視されます。 たとえば、タグ <abc:author> と <def:author> は、どちらも単なる <author> として扱われます。
rowTag 要素では名前空間を無視することはできないため、その子要素の読み込みのみを取り扱うことが可能です。
false の場合でも、XML 解析は名前空間を認識しません。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ形式の文字列 | カスタムタイムスタンプの形式文字列であり、datetime パターンに従います。 これは timestamp 型に適用されます。
DataFrameWriter XML オプションにも適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | タイムゾーンを含まないタイムスタンプのカスタム形式の文字列で、datetime パターン形式に従っています。 これは TimestampNTZType 型に適用されます。 DataFrameWriter XML オプションにも適用されます。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 | カスタム日付形式の文字列であり、datetime パターン形式に従います。 これは、日付型に適用されます。 DataFrameWriter XML オプションにも適用されます。 |
locale |
en-US |
IETF BCP 47 言語タグ | IETF BCP 47 形式の言語タグとしてロケールを設定します。 たとえば、locale は日付とタイムスタンプの解析中に使用されます。 |
nullValue |
糸 null |
任意の文字列 | null 値の文字列表記を設定します。 これが null である場合、パーサーはフィールドの属性と要素を書き込みません。
DataFrameWriter XML オプションにも適用されます。 |
readerCaseSensitive |
true |
true、false |
rescuedDataColumn が有効な場合、大文字と小文字の区別の動作を指定します。 true の場合は、名前がスキーマと大文字と小文字が異なるデータ列を復旧します。 false の場合は、大文字と小文字を区別せずにデータを読み取る。 |
rescuedDataColumn |
None | 列名の文字列 | データ型の不一致とスキーマの不一致 (列の大文字と小文字の区別を含む) のために解析できないすべてのデータを別の列に収集するかどうか。 自動ローダーを使用する場合、この列は既定で含まれます。 詳細については、「復旧されたデータ列とは」を参照してください。
COPY INTO (レガシ) では、COPY INTOを使用してスキーマを手動で設定できないため、復旧されたデータ列はサポートされません。 Databricks では、ほとんどのインジェスト シナリオで自動ローダーを使用することをお勧めします。 |
singleVariantColumn |
none |
列名の文字列 | 1 つのバリアント列の名前を指定します。 このオプションが読み取り用に指定されている場合は、指定されたオプション文字列値を列の名前として使用して、XML レコード全体を 1 つの Variant 列に解析します。 このオプションを書き込みに指定した場合は、1 つの Variant 列の値を XML ファイルに書き込みます。 DataFrameWriter XML オプションにも適用されます。 |
useLegacyXMLParser |
true |
true、false |
レガシ XML パーサーを使用するかどうか。 レガシ パーサーでは、形式が正しくないコンテンツに対する検証の厳格さが低くなりますが、メモリ効率は低くなります。 より厳密な既定のパーサーをオプトインするには、 false に設定します。 |
wildcardColName |
xs_any |
列名の文字列 | ワイルドカード (xs:any) スキーマ要素に一致する XML 要素をキャプチャするために使用される列名。
rescuedDataColumnと一緒に使用することはできません。 |
DataStreamReader オプション
これらのオプションを DataStreamReader.option() と共に使用して、Delta Lake テーブルやその他のファイル ベースのソースからのストリーミング読み取りを構成します。
ファイル形式のオプション (JSON、CSV、Parquet など) については、「 DataFrameReader オプション」を参照してください。
自動ローダー (cloudFiles.*) オプションについては、「 自動ローダー」を参照してください。
Example
次の例では、Delta Lake テーブル ストリームのmaxFilesPerTriggerに10を設定します。
Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
Scala
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
共通
Delta Lake テーブルとその他のファイル ベースのストリーミング ソースには、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
cleanSource |
off |
off、delete、archive |
ソース ファイルがストリームによって処理された後で処理する方法。
off は何のアクションも実行しません。
delete ソース ファイルを完全に削除します。
archive は、ファイルを sourceArchiveDirに移動します。
archiveに設定する場合は、sourceArchiveDirも設定する必要があります。 Delta Lake テーブル ストリーミングには適用されません。 |
fileNameOnly |
false |
true、false |
既に処理されているファイルを、完全なパスではなくファイル名のみで識別するかどうか。
trueすると、同じファイル名を持つ異なるパスにあるファイルは同じファイルとして扱われ、再処理されません。 Delta Lake テーブル ストリーミングには適用されません。 |
latestFirst |
false |
true、false |
各マイクロバッチ内で、最後に変更されたファイルを最初に処理するかどうか。 可能な限り迅速に最新のデータを処理する場合に便利です。
trueとmaxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合、maxFileAgeは無視されます。 Delta Lake テーブル ストリーミングには適用されません。 |
maxBytesPerTrigger |
None | 正の整数 | マイクロバッチごとに処理されるデータ量のソフト最大値。 最小の入力ユニットが上限を超えると、バッチが制限を超えて処理される場合があります。
maxFilesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。自動ローダーの場合は、代わりに cloudFiles.maxBytesPerTrigger を使います。
「共通」を参照してください。 |
maxCachedFiles |
10000 |
正の整数または 0 |
後続のマイクロバッチ用にキャッシュする未処理のファイルの最大数。 キャッシュをオフにするには、 0 に設定します。 ソース ディレクトリにトリガーごとに多数の新しいファイルが含まれている場合は、この値を大きくします。 Delta Lake テーブル ストリーミングには適用されません。 |
maxFileAge |
7d |
7dや4h |
現在のシステム時刻ではなく、最近変更されたファイルのタイムスタンプを基準とした、処理対象と見なされるファイルの最大有効期間。 このしきい値より古いファイルは無視されます。
latestFirstがtrueされ、maxFilesPerTriggerまたはmaxBytesPerTriggerが設定されている場合は無視されます。 Delta Lake テーブル ストリーミングには適用されません。 |
maxFilesPerTrigger |
1000 Delta Lake と自動ローダー用。 その他のファイル ベースのソースに対する最大値はありません。 |
正の整数 | 各マイクロバッチで処理される新しいファイルの数の上限。
maxBytesPerTriggerと共に使用すると、マイクロバッチは、いずれかの制限に最初に達するまでデータを処理します。自動ローダーの場合は、代わりに cloudFiles.maxFilesPerTrigger を使います。
「共通」を参照してください。 |
sourceArchiveDir |
None | パス文字列 |
cleanSourceが archive に設定されている場合のアーカイブ ディレクトリへのパス。 ソース ファイルは、処理後にこのパスに移動され、相対ディレクトリ構造が維持されます。 Delta Lake テーブル ストリーミングには適用されません。 |
自動ローダー
これらのオプションを cloudFiles ソースと共に使用して、クラウド ストレージからのインジェストをストリーミングするように 自動ローダー を構成します。
cloudFiles ソースに固有のオプションには、他のcloudFiles ソース オプションとは別の名前空間に保持するために、が付いています。
共通
次のオプションは、すべての自動ローダー構成に適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
cloudFiles.allowOverwrites |
false |
true、false |
入力ディレクトリ ファイルの変更による既存のデータの上書きを許可するかどうか。 構成に関する注意事項については、「 ファイルが追加または上書きされたときに、自動ローダーによってファイルが再び処理されますか?」を参照してください。 |
cloudFiles.backfillInterval |
None |
1 dayや1 week |
自動ローダーは、特定の間隔で非同期バックフィルをトリガーできます。 詳細については、「 cloudFiles.backfillInterval を使用して通常のバックフィルをトリガーする」を参照してください。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
cloudFiles.cleanSource |
OFF |
OFF、DELETE、MOVE |
処理されたファイルを自動的に削除するか、入力ディレクトリから移動するか。
OFF (既定値) に設定すると、ファイルは削除されません。DELETEに設定すると、自動ローダーは処理されてから 30 日後に自動的にファイルを削除します。 これを行うには、自動ローダーにソース ディレクトリへの書き込みアクセス許可が必要です。MOVEに設定すると、自動ローダーは、ファイルが処理されてから 30 日後cloudFiles.cleanSource.moveDestination指定した場所に自動的に移動します。 これを行うには、自動ローダーには、ソース ディレクトリと移動場所に対する書き込みアクセス許可が必要です。ファイルは、 commit_timeテーブル値関数の結果でcloud_files_stateに null 以外の値がある場合に処理されたと見なされます。
テーブル値関数cloud_files_state参照してください。 処理後の 30 日間の追加待機は、 cloudFiles.cleanSource.retentionDurationを使用して構成できます。cloudFiles.cleanSourceを有効にする前に、次の考慮事項を確認してください。
Databricks Runtime 16.4 以降で使用できます。 |
cloudFiles.cleanSource.retentionDuration |
30 days |
calendarInterval 文字列 (14 days、2 weeks、または1 month |
処理されたファイルが cleanSourceを使用したアーカイブの候補になるまでの待機時間。
DELETEの場合は 7 日を超える必要があります。
MOVEの最小制限はありません。Databricks Runtime 16.4 以降で使用できます。 |
cloudFiles.cleanSource.moveDestination |
None | クラウド ストレージまたは Unity カタログのボリューム パス |
cloudFiles.cleanSourceがMOVEに設定されている場合に処理されたファイルをアーカイブするパス。 クラウド ストレージ パスまたは Unity カタログ ボリューム パス ( /Volumes/my_catalog/my_schema/my_volume/archive/ など) を指定できます。移動場所は次の条件を満たす必要があります。
自動ローダーには、このディレクトリへの書き込みアクセス許可が必要です。 Databricks Runtime 16.4 以降で使用できます。 |
cloudFiles.format |
なし (必須オプション) |
avro、 binaryFile、 csv、 json、 orc、 parquet、 text、 xml |
ソース パスのデータ ファイル形式。 有効な値は次のとおりです。 |
cloudFiles.includeExistingFiles |
true |
true、false |
ストリーム処理入力パスに既存のファイルを含めるか、初期セットアップ後に到着した新しいファイルのみを処理するか。 このオプションは、初めてストリームを開始するときにのみ評価されます。 ストリームの再起動後にこのオプションを変更した場合、効果はありません。 |
cloudFiles.inferColumnTypes |
false |
true、false |
スキーマの推論を利用するときに、正確な列型を推論するかどうか。 既定では、列は JSON および CSV データセットを推論するときに文字列として推論されます。 詳細については、スキーマの推論に関する説明を参照してください。 |
cloudFiles.maxBytesPerTrigger |
None | 次のようなバイト文字列 10g |
各トリガーで処理される新しいバイトの最大数。 これはソフト最大値です。 それぞれ 3 GB のファイルがある場合、Azure Databricksはマイクロバッチで 12 GB を処理します。 個々のファイルがマイクロバッチに分割されることはありません。サイズがこの制限を超えた場合でも、常に 1 つの範囲内で完全に処理されます。
cloudFiles.maxFilesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。 このオプションは、Trigger.Once() (Trigger.Once() は非推奨) と共に使用しても無効です。Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。 |
cloudFiles.maxFileAge |
None | 期間文字列 | 重複排除を目的としてファイル イベントを追跡する期間。 Databricks では、1 時間に数百万のファイルの順序でデータを取り込む場合でない限り、このパラメーターのチューニングは推奨しません。 詳細については、 ファイル イベントの追跡 に関するセクションを参照してください。cloudFiles.maxFileAge のチューニングがアグレッシブすぎると、重複取り込みやファイル欠如など、データ品質の問題を引き起こすことがあります。 そのため、Databricks は cloudFiles.maxFileAge に 90 日間などの控えめな設定を推奨しています。同等のデータ インジェスト ソリューションもこのくらいを推奨しています。 |
cloudFiles.maxFilesPerTrigger |
1000 |
正の整数 | 各トリガーで処理される新しいファイルの最大数。
cloudFiles.maxBytesPerTrigger と使用すると、Azure Databricks では、cloudFiles.maxFilesPerTrigger または cloudFiles.maxBytesPerTrigger の下限のうち、最初に到達した方までを消費します。
Trigger.Once() (非推奨) と一緒に使用すると、このオプションは無効です。Databricks Runtime 18.0 以降では、このオプションは動的に構成され、手動で設定する必要はありません。 |
cloudFiles.partitionColumns |
None | 列名のコンマ区切りリスト | ファイルのディレクトリ構造から推論する Hive スタイルのパーティション列のコンマ区切りの一覧。 Hive スタイルのパーティション列は、 <base-path>/a=x/b=1/c=y/file.formatなどの等値記号で結合されたキーと値のペアです。 この例では、パーティション列は a、b、c です。 既定では、スキーマ推論を使用し、データを読み込む <base-path> を指定している場合、これらの列は自動的にスキーマに追加されます。 スキーマを指定した場合、自動ローダーでは、これらの列がスキーマに含まれることが想定されます。 これらの列をスキーマの一部に含めない場合は、"" を指定して、これらの列を無視することができます。 さらに、下の例のような複雑なディレクトリ構造のファイル パスから列を推論するときに、このオプションを使用できます。<base-path>/year=2022/week=1/file1.csv<base-path>/year=2022/month=2/day=3/file2.csv<base-path>/year=2022/month=2/day=4/file3.csvcloudFiles.partitionColumnsとしてyear,month,dayを指定すると、year=2022のfile1.csvが返されますが、month列とday列はnull。month と day は、 file2.csv と file3.csvに対して正しく解析されます。 |
cloudFiles.schemaEvolutionMode |
addNewColumns スキーマが指定されていない場合は none それ以外の場合 |
addNewColumns、none、rescue、failOnNewColumns |
新しい列がデータで検出された場合にスキーマを展開するモード。 既定では、列は JSON データセットを推論するときに文字列として推論されます。 詳細については、スキーマの展開に関する説明を参照してください。 |
cloudFiles.schemaHints |
None | スキーマ文字列 | スキーマ推論中に自動ローダーに指定するスキーマ情報。 詳細については、スキーマ ヒントに関するページを参照してください。 |
cloudFiles.schemaLocation |
なし (スキーマを推論するために必要) | パス文字列 | 推論されたスキーマとそれ以降の変更を保存する場所。 詳細については、スキーマの推論に関する説明を参照してください。 |
cloudFiles.useStrictGlobber |
false |
true、false |
Apache Spark の他のファイル ソースの既定のグロビング動作に一致する厳密な globber を使用するかどうか。 詳細については、「一般的なデータ読み込みパターン」を参照してください。 Databricks Runtime 12.2 LTS 以降で使用できます。 |
cloudFiles.validateOptions |
true |
true、false |
自動ローダー オプションを検証し、不明なオプションまたは一貫性のないオプションに対してエラーを返すかどうか。 |
ディレクトリの一覧
ディレクトリ一覧モードを使用する場合は、次のオプションが適用されます。
ファイル通知
必要なクラウドのアクセス許可、セットアップ手順、認証方法など、ファイル通知モードの構成については、「 ファイル通知モードでの自動ローダー ストリームの構成」を参照してください。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
cloudFiles.fetchParallelism |
1 |
正の整数 | キュー サービスからメッセージを取得するときに使用するスレッドの数。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
cloudFiles.pathRewrites |
None | JSON マップ文字列 | 複数の S3 バケットからファイル通知を受信する queueUrl を指定し、これらのコンテナー内のデータにアクセスするように構成されたマウント ポイントを使用する場合にのみ必要です。
bucket/key パスのプレフィックスをマウント ポイントで書き換える場合は、このオプションを使用します。 プレフィックスのみ書き換え可能です。 たとえば、構成 {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}の場合、パス s3://<databricks-mounted-bucket>/path/2017/08/fileA.json は dbfs:/mnt/data-warehouse/2017/08/fileA.jsonに書き換えられます。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
cloudFiles.resourceTag |
None | キーと値のタグ文字列 | 関連するリソースの関連付けと識別に役立つ一連のキーと値のタグ ペア。次に例を示します。cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 代わりに、クラウド プロバイダー コンソールを使用してリソース タグを設定します。詳細については、「 クラウド プロバイダーのリソース タグ」を参照してください。 |
cloudFiles.useManagedFileEvents |
false |
true、false |
trueに設定すると、自動ローダーはファイル イベント サービスを使用して、外部の場所にあるファイルを検出します。 このオプションは、読み込みパスがファイル イベントが有効になっている外部の場所にある場合にのみ使用できます。
ファイル イベントでファイル通知モードを使用するを参照してください。自動ローダーは前回の実行後に新しいファイルを検出できるため、ファイル イベントはファイル検出で通知レベルのパフォーマンスを提供します。 ディレクトリ一覧とは異なり、このプロセスではディレクトリ内のすべてのファイルを一覧表示する必要はありません。 ファイル イベント オプションが有効になっている場合でも、自動ローダーでディレクトリ 一覧が使用される場合があります。
自動ローダーでこのオプション を指定してディレクトリ一覧を使用 する場合の状況の包括的な一覧については、「ファイル イベントを含む自動ローダーでディレクトリ一覧を使用するタイミング」を参照してください。 Databricks Runtime 14.3 LTS 以降で使用できます。 |
cloudFiles.listOnStart |
false |
true、false |
trueに設定すると、自動ローダーは、チェックポイントの継続トークンで始まるのではなく、ストリームの開始時に完全なディレクトリ一覧を実行します。
CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKENなどのエラーから回復するには、このオプションを使用します。
「CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN エラーから回復する方法」を参照してください。 |
cloudFiles.useNotifications |
false |
true、false |
ファイル通知モードを使用して、新しいファイルがあるときを判断するかどうか。
false の場合は、ディレクトリ一覧モードを使用します。 「自動ローダー ファイル検出モードを比較する」を参照してください。cloudFiles.useManagedFileEventsが true に設定されている場合は使用しないでください。 |
クラウド プロバイダーのリソース タグ
自動ローダーでは、ベスト エフォートベースで次のキーと値のタグのペアが既定で追加されます。
-
vendor:Databricks -
path: データが読み込まれる場所。 ラベル付けの制限のため、GCP では使用できません。 -
checkpointLocation: ストリームのチェックポイントの場所。 ラベル付けの制限のため、GCP では使用できません。 -
streamId: ストリームのグローバルに一意な識別子。
Databricks はこれらのキー名を予約し、それらの値を上書きすることはできません。
Azure の詳細については、キューとメタデータの名前付けと、properties.labelsのイベント サブスクリプションに関する情報をご覧ください。 自動ローダーは、これらのキーと値のタグ ペアを JSON にラベルとして保存します。
クラウド固有
自動ローダーには、ファイル通知モード用にクラウド インフラストラクチャを構成するためのオプションがあります。 必要なクラウドのアクセス許可とセットアップ手順については、「 ファイル通知モードで自動ローダー ストリームを構成する」を参照してください。
Azure
cloudFiles.useNotifications
=
trueを指定し、自動ローダーで通知サービスを設定する場合は、次のすべてのオプションの値を指定する必要があります。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
cloudFiles.resourceGroup |
None | 任意の文字列 | ストレージ アカウントが作成されるAzure リソース グループ。 |
cloudFiles.subscriptionId |
None | 任意の文字列 | リソース グループが作成されるAzure サブスクリプション ID。 |
databricks.serviceCredential |
None | 任意の文字列 | Databricks のサービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。 |
Databricks サービスの資格情報を使用できない場合は、代わりに次の認証オプションを指定できます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
cloudFiles.clientId |
None | 任意の文字列 | サービス プリンシパルのクライアント ID またはアプリケーション ID。 |
cloudFiles.clientSecret |
None | 任意の文字列 | サービス プリンシパルのクライアント シークレット。 |
cloudFiles.connectionString |
None | 接続の文字列 | アカウント アクセス キーまたは Shared Access Signature (SAS) に基づくストレージ アカウントの接続文字列。 |
cloudFiles.tenantId |
None | 任意の文字列 | サービス プリンシパルが作成されるAzure テナント ID。 |
cloudFiles.useNotifications
=
trueを設定し、自動ローダーで既存のキューを使用する場合にのみ、次のオプションを指定します。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
cloudFiles.queueName |
None | 任意の文字列 | Azure キューの名前。 指定した場合、クラウド ファイル ソースは、独自のAzure Event Gridおよび Queue Storage サービスを設定する代わりに、このキューからのイベントを直接使用します。 その場合、databricks.serviceCredential または cloudFiles.connectionString では、キューに対する読み取りアクセス許可のみが必要です。 |
Delta Lake
次のオプションは、 spark.readStreamを使用して Delta Lake テーブルから読み取るときに適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
allowSourceColumnDrop |
None | バージョン番号または always |
差分テーブルのバージョン番号または always に設定すると、ソース テーブル スキーマから列が削除された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。
schemaTrackingLocation が必要です。
「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。 |
allowSourceColumnRename |
None | バージョン番号または always |
差分テーブルのバージョン番号または always に設定すると、ソース テーブル内の列の名前が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。
schemaTrackingLocation が必要です。
「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。 |
allowSourceColumnTypeChange |
None | バージョン番号または always |
差分テーブルのバージョン番号または always に設定すると、ソース テーブルで列の種類が変更された後もストリームを続行できます。 バージョン番号に設定すると、そのバージョンまでのすべてのスキーマ変更を確認します。
schemaTrackingLocation が必要です。 「タイプ拡張」を参照してください。 |
excludeRegex |
None | Java正規表現文字列 | 正規表現パターン。 パスがパターンと一致するファイルは、ストリーミング読み取りから除外されます。 予期される名前付け規則に準拠していないファイルをフィルターで除外する場合に便利です。 |
failOnDataLoss |
true |
true、false |
ログの保持 (logRetentionDuration) のためにソース データが削除された場合にストリーミング クエリを失敗させるかどうか。 不足しているデータをスキップして処理を続行するには、 false に設定します。 「タイム トラベル クエリのデータ保持を構成する」を参照してください。 |
ignoreChanges (非推奨) |
false |
true、false |
Databricks Runtime 11.3 LTS 以前で使用できます。
UPDATE、MERGE INTO、DELETE、OVERWRITEなどの変更操作後に、書き換えられたデータ ファイルを再出力します。 変更されていない行は新しい行と共に出力される可能性があるため、ダウンストリーム コンシューマーは重複を処理する必要があります。 削除はダウンストリームには反映されません。 Databricks Runtime 12.2 LTS 以降の skipChangeCommits に置き換えられました。 |
ignoreDeletes (非推奨) |
false |
true、false |
パーティション境界でデータを削除するトランザクションを無視します (パーティションの完全な削除のみ)。 パーティション以外の削除、更新、またはその他の変更は処理しません。
skipChangeCommits を代わりに使用します。 |
readChangeFeed または readChangeData |
false |
true、false |
ストリーミング クエリの変更データ フィードの読み取りを有効にするかどうかを指定します。 有効にすると、ストリームは、追加のメタデータ列を含む行レベルの変更 (挿入、更新、および削除) を出力します。 Azure Databricksでの変更データ フィードの使用を参照してください。 |
schemaTrackingLocation |
None | パス文字列 | Delta Lake がストリーミング読み取りのスキーマ変更を追跡するディレクトリへのパス。 列マッピングが有効になっているテーブルからストリーミングし、スキーマの進化を処理するために allowSourceColumn* オプションを使用する場合に必要です。 ストリーミング クエリの checkpointLocation 内にある必要があります。
「Delta Lake 列マッピングを使用して列の名前を変更および削除する」を参照してください。 |
skipChangeCommits |
false |
true、false |
既存のレコードを削除または変更するトランザクションを無視し、追加のみを処理します。 Databricks では、変更データ フィードを使用しないほとんどのワークロードに対して、このオプションをお勧めします。 Databricks Runtime 12.2 LTS 以降で使用できます。
skipChangeCommitsを使用したアップストリーム変更コミットのスキップを参照してください。 |
startingTimestamp |
利用可能な最新 |
2019-01-01T00:00:00.000Zなどのタイムスタンプ文字列または日付文字列 (例:2019-01-01 |
読み取りを開始するタイムスタンプ。 ストリームは、指定されたタイムスタンプ以降にコミットされたすべてのテーブル変更を読み取ります。 使用可能なすべてのテーブルコミットの前にタイムスタンプがある場合、ストリームは使用可能な最も早いコミットから開始されます。
startingVersionと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。 |
startingVersion |
利用可能な最新 | 正の整数、 0、または latest |
読み取りを開始する差分テーブルのバージョン。 ストリームは、指定したバージョン以降にコミットされたすべての変更を読み取ります。 最新の変更からのみ開始する latest を指定します。
startingTimestampと一緒に使用することはできません。 ストリーミング チェックポイントが既に存在する場合は無視されます。
テーブル履歴の操作を参照してください。 |
withEventTimeOrder |
false |
true、false |
初期テーブル スナップショットをイベント時間バケットに分割して、レコードが誤って遅延イベントとしてマークされ、ウォーターマーク付きのステートフル クエリで削除されないようにします。 チェックポイントを削除しないと、初期スナップショット処理が開始された後は変更できません。 Databricks Runtime 11.3 LTS 以降で使用できます。 「データを削除せずに初期スナップショットを処理する」を参照してください。 |
カフカ
spark.readStream.format("kafka")またはspark.read.format("kafka")で次のオプションを使用します。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
assign |
None | 次のような JSON 文字列 {"topicA":[0,1],"topicB":[2,4]} |
使用する特定のパーティション。
subscribe、subscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 |
failOnDataLoss |
true |
true、false |
削除されたトピックやオフセットの切り捨てなどによってデータが失われた可能性がある場合にクエリを失敗させるかどうか。 不足しているデータをスキップして続行するには、 false に設定します。Databricks は、データが失われた可能性があるかどうかを保守的に見積もります。 ただし、誤ったアラームが発生する可能性があります。 |
fetchoffset.numretries |
3 |
正の整数または 0 |
Kafka オフセットのフェッチが失敗したときの再試行回数。 |
fetchoffset.retryintervalms |
1000 |
正の整数または 0 |
オフセット フェッチの再試行間隔 (ミリ秒単位)。 |
groupIdPrefix |
spark-kafka-source (ストリーミング)、 spark-kafka-relation (バッチ) |
任意の文字列 | 自動生成された Kafka コンシューマー グループ ID に使用するカスタマイズされたプレフィックス。
kafka.group.idが明示的に設定されている場合、コネクタはこのオプションを無視します。 |
kafka.group.id |
None | 任意の文字列 | 読み取り時に使用する Kafka コンシューマー グループ ID。 使用には注意が必要です。同じグループ ID を共有するクエリは相互に干渉し、部分的なデータのみを読み取る可能性があります。 これは、同時実行バッチとストリーミング ワークロードを実行するとき、またはクエリをすばやく再起動するときに発生する可能性があります。 設定した場合、 groupIdPrefix は無視されます。 問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms を小さな値に設定します。 |
includeHeaders |
false |
true、false |
Kafka メッセージ ヘッダーを列として出力に含めるかどうか。 |
kafkaconsumer.polltimeoutms |
None | 正の整数 | Kafka コンシューマー poll() 呼び出しのタイムアウト (ミリ秒)。 |
kafka.bootstrap.servers |
None |
host:port文字列のコンマ区切りリスト |
Kafka ブローカーの host:port アドレスのコンマ区切りのリスト。 Kafka クライアントの bootstrap.servers プロパティを設定します。Kafka からのデータがない場合は、このブローカーのアドレス一覧で正しくないアドレスを確認してください。 ブローカーのアドレス一覧が正しくない場合は、エラーがない可能性があります。 Kafka クライアントは、ブローカーが最終的に使用可能になると想定し、ネットワーク エラーを受信したときに永久に再試行します。 |
maxRecordsPerPartition |
None | 正の整数 | 各 Spark パーティションのレコードの最大数。 設定すると、コネクタは Kafka パーティションを分割して、各 Spark パーティションが最大でこの多くのレコードを読み取るようにします。 このオプションは、 minPartitionsで使用することもできます。 両方のオプションが設定されている場合、Spark はどちらのオプションを使用してもパーティションが増えます。 |
minPartitions |
None | 正の整数 | Kafka から読み取る Spark パーティションの最小数。 設定すると、コネクタは大きな Kafka パーティションを分割して並列処理を向上させます。 設定しない場合、Spark は Kafka トピック パーティションごとに 1 つのパーティションを作成します。 データ スキューやピーク時の負荷を処理する場合に便利です。 このオプションは、トリガーごとに Kafka コンシューマーを再初期化します。これは、SSL のパフォーマンスに影響する可能性があります。 |
startingOffsets |
latest (ストリーミング)、 earliest (バッチ) |
earliest、 latest、または JSON オフセット文字列 |
クエリが読み取りを開始するオフセット。 JSON 文字列では、 -1 は最新のオフセットです。
-2 は最も早いオフセットです。 たとえば、 {"topicA":{"0":23,"1":-2}}と指定します。ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。 バッチ クエリの場合、 latest は許可されません。 |
startingOffsetsByTimestamp |
None | JSON タイムスタンプ文字列 (例: {"topicA":{"0":1000,"1":2000}} |
各パーティションの開始オフセットのリスト。タイムスタンプとしてミリ秒単位で指定されます。 タイムスタンプのオフセットが存在しない場合、クエリの動作は startingOffsetsByTimestampStrategyによって決定されます。ストリーミング クエリの場合、このオプションは新しいクエリの開始時にのみ適用されます。 再開されたクエリでは、常にチェックポイントが使用されます。 クエリ中、新しいパーティションは最も早いオフセットで読み取りを開始します。 |
startingOffsetsByTimestampStrategy |
error |
error、latest |
startingOffsetsByTimestampまたはstartingTimestampで指定されたタイムスタンプのオフセットが見つからない場合に使用する戦略。
error は例外を発生させます。
latest では、使用可能な最新のオフセットが使用されます。 |
startingTimestamp |
None | 正の整数または 0 |
すべてのパーティションに適用されるグローバル開始タイムスタンプ (ミリ秒単位)。 タイムスタンプのオフセットが存在しない場合、動作は startingOffsetsByTimestampStrategyによって制御されます。 |
subscribe |
None | トピック名のコンマ区切りリスト | サブスクライブするトピック。
subscribe、subscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 |
subscribePattern |
None | Java正規表現文字列 | トピックのサブスクライブに使用されるパターン。
subscribe、subscribePattern、またはassignオプションのいずれかを正確に指定する必要があります。 たとえば、「 topic.* 」のように入力します。 |
次のオプションは、 spark.readStream.format("kafka")を使用したストリーミング読み取りにのみ適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
bytesEstimateWindowLength |
300s |
10mや ɦ>などの期間文字列600s |
estimatedTotalBytesBehindLatest メトリックの残りのバイト数を見積もるために使用される時間枠。 詳しくは、Kafka メトリックを取得するをご覧ください。 |
maxOffsetsPerTrigger |
None | 正の整数 | トリガー間隔ごとに処理するオフセットの最大数。 オフセットは、トピック パーティション間で比例して分散されます。 |
maxTriggerDelay |
15m |
10mや ɦ>などの期間文字列600s |
トリガーする前に minOffsetsPerTrigger が蓄積されるまで待機する最大時間。 |
minOffsetsPerTrigger |
None | 正の整数 | マイクロバッチをトリガーする前に累積するオフセットの最小数。
maxTriggerDelayに達すると、マイクロバッチは関係なく実行されます。 |
spark.read.format("kafka")を使用したバッチ読み取りにのみ適用されるオフセット オプションについては、「DataFrameReader Kafka オプション」を参照してください。
認証
Databricks では、Unity カタログ サービスの資格情報を使用して、クラウドで管理される Kafka サービス (AWS MSK、Azure Event Hubs、または Google Cloud Managed Kafka) に対する認証を行うことをお勧めします。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
databricks.serviceCredential |
None | 任意の文字列 | クラウドで管理される Kafka サービスに対して認証するための Unity カタログ サービス 資格情報 の名前。 Databricks Runtime 16.1 以降で使用できます。 |
databricks.serviceCredential.scope |
None | 任意の文字列 | サービス資格情報の OAuth スコープ。 これは、Azure Databricksが Kafka サービスのスコープを自動的に推論できない場合にのみ設定します。 |
サービス資格情報を使用できない場合は、SASL/SSL オプション ( kafka.* プロパティとして渡されます) を使用します。 サービス資格情報を使用する場合、 kafka.sasl.mechanism、 kafka.sasl.jaas.config、または kafka.security.protocolを指定する必要はありません。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
kafka.security.protocol |
None | セキュリティ プロトコル文字列 ( SASL_SSL、 SSL、 PLAINTEXT |
ブローカー通信のセキュリティ プロトコル。 |
kafka.sasl.mechanism |
None | SASL メカニズム文字列 ( PLAIN、 SCRAM-SHA-256、 SCRAM-SHA-512、 OAUTHBEARERなど) AWS_MSK_IAM |
SASL メカニズム。 |
kafka.sasl.jaas.config |
None | JAAS 構成文字列 | JAAS ログイン構成文字列。 |
kafka.sasl.login.callback.handler.class |
None | 完全修飾クラス名 | SASL 認証のログイン コールバック ハンドラーの完全修飾クラス名。 |
kafka.sasl.client.callback.handler.class |
None | 完全修飾クラス名 | SASL 認証用のクライアント コールバック ハンドラーの完全修飾クラス名。 |
kafka.ssl.truststore.location |
None | ファイル パス文字列 | SSL 信頼ストア ファイルへのパス。 |
kafka.ssl.truststore.password |
None | 任意の文字列 | SSL 信頼ストア ファイルのパスワード。 |
kafka.ssl.keystore.location |
None | ファイル パス文字列 | SSL キー ストア ファイルへのパス。 |
kafka.ssl.keystore.password |
None | 任意の文字列 | SSL キー ストア ファイルのパスワード。 |
認証のセットアップ手順の詳細については、「 認証」を参照してください。
Pub/Sub
これらのオプションを spark.readStream.format("pubsub") と共に使用して、Google Pub/Sub にサブスクライブします。 オプション subscriptionId、 topicId、および projectId が必要です。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
subscriptionId |
None | 任意の文字列 | 必須。 Pub/Sub サブスクリプション ID。 サブスクリプションが存在しない場合は、コネクタによって作成されます。 |
topicId |
None | 任意の文字列 | 必須。 Pub/Sub トピック ID。 |
projectId |
None | 任意の文字列 | 必須。 Google Cloud プロジェクト ID。 |
numFetchPartitions |
ストリーム初期化時に使用可能な Executor の数の半分 | 正の整数 | サブスクリプションから行をフェッチする並列 Spark タスクの数。 |
maxBytesPerTrigger |
None | 正の整数 | マイクロバッチごとに処理するバイト数のソフト制限。 |
maxRecordsPerFetch |
1000 |
正の整数 | 処理前にタスクごとにフェッチする行数。 |
maxFetchPeriod |
10s |
1sや1m |
各タスクが行を処理する前に取得する時間間隔。 Azure Databricksでは、既定値を使用することをお勧めします。 |
deleteSubscriptionOnStreamStop |
false |
true、false |
trueすると、ストリーミング クエリの終了時に、subscriptionIdのサブスクリプションが削除されます。 |
serviceCredential |
None | 任意の文字列 | Pub/Sub に対して認証するためのAzure Databricks サービス資格情報の名前。 Databricks Runtime 16.1 以降で使用できます。 |
clientEmail |
None | 電子メール アドレス文字列 | Google サービス アカウントのメール アドレス。 サービス資格情報を使用しない場合に必要です。 |
clientId |
None | 任意の文字列 | Google Service Account のクライアント ID。 サービス資格情報を使用しない場合に必要です。 |
privateKey |
None | 秘密キー文字列 | Google サービス アカウントの秘密キー。 サービス資格情報を使用しない場合に必要です。 |
privateKeyId |
None | 任意の文字列 | Google サービス アカウントの秘密キー ID。 サービス資格情報を使用しない場合に必要です。 |
Pub/Sub の詳細については、「 Google Pub/Sub のサブスクライブ」を参照してください。
パルサー
Apache Pulsar からストリーミングするには、これらのオプションを spark.readStream.format("pulsar") と共に使用します。 Databricks Runtime 14.1 以降で使用できます。
次のオプションが必要です。
topic、topics、またはtopicsPatternのいずれかを指定する必要があります。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
service.url |
None | Pulsar サービスの URL 文字列 | Pulsar サービスの Pulsar serviceURL (例: pulsar://broker.example.com:6650)。 |
topic |
None | 任意の文字列 | 使用する 1 つのトピック名。 |
topics |
None | トピック名のコンマ区切りリスト | 使用するトピック名のコンマ区切りのリスト。 |
topicsPattern |
None | Java正規表現文字列 | トピック名と一致するJava正規表現文字列。 |
次のオプションもサポートされています。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
admin.url |
None | URL 文字列 | Pulsar 管理サービスの HTTP URL。
maxBytesPerTriggerが設定されている場合に必要です。 |
allowDifferentTopicSchemas |
false |
true、false |
スキーマが異なる複数のトピックを読み取る場合は、このオプションを使用して、スキーマベースのトピック値の自動逆シリアル化を無効にします。 これが true の場合は、生の値のみが返されます。 |
failOnDataLoss |
true |
true、false |
データが失われたときにクエリを失敗させるかどうか。 たとえば、アイテム保持ポリシーが原因でトピックが削除されたり、メッセージの有効期限が切れたりすると、データ損失が発生する可能性があります。 |
maxBytesPerTrigger |
None | 正の整数 | マイクロバッチごとに処理するバイト数のソフト制限。
admin.url が必要です。 |
pollTimeoutMs |
120000 |
正の整数 | Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。 |
predefinedSubscription |
None | 任意の文字列 | Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される定義済みのサブスクリプション名。 |
startingOffsets |
latest |
latest、 earliest、または JSON オフセット文字列 |
読み取りを開始する場所。 |
subscriptionPrefix |
None | 任意の文字列 | Spark アプリケーションの進行状況を追跡するランダム なサブスクリプションを生成するためにコネクタによって使用されるプレフィックス。 |
waitingForNonExistedTopic |
false |
true、false |
コネクタが目的のトピックが作成されるまで待機するかどうか。 |
次のオプション パターンを使用して、追加の Pulsar クライアント、管理者、リーダーの構成を指定できます。
| Pattern | 構成オプション |
|---|---|
pulsar.admin.* |
Pulsar 管理者構成 |
pulsar.client.* |
やpulsar.client.authPluginClassNameなどの認証オプションを含む pulsar.client.authParams。 |
pulsar.reader.* |
Pulsar リーダー構成 |
Pulsar クライアントと管理者認証オプションの詳細については、「 認証」を参照してください。
認証
Azure Databricks では、Pulsar に対する認証として、トラストストアとキーストアがサポートされています。 Azure Databricksでは、シークレットを使用して認証の詳細を格納することをお勧めします。 「シークレットの管理」を参照してください。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
pulsar.client.authPluginClassName |
None | 完全修飾クラス名 | 認証プラグインの完全修飾クラス名。 たとえば、「 org.apache.pulsar.client.impl.auth.AuthenticationTls 」のように入力します。 |
pulsar.client.authParams |
None | 資格情報文字列 | 認証プラグインに文字列として渡される認証資格情報。 たとえば、「 tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem 」のように入力します。 |
pulsar.client.useKeyStoreTls |
false |
true、false |
trueすると、PEM 形式のファイルではなく、KeyStore ベースの TLS 構成が有効になります。 |
pulsar.client.tlsTrustStoreType |
None | 任意の文字列 | TLS 信頼ストア ファイルの形式。 たとえば、「 JKS 」のように入力します。 |
pulsar.client.tlsTrustStorePath |
None | ファイル パス文字列 | 信頼された CA 証明書を含む TLS 信頼ストア ファイルへのパス。
pulsar.client.useKeyStoreTls が true の場合に必要です。 |
pulsar.client.tlsTrustStorePassword |
None | 任意の文字列 | TLS 信頼ストア ファイルのパスワード。 |
ストリームで PulsarAdminを使用する場合は、次のオプションを設定することもできます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
pulsar.admin.authPluginClassName |
None | 完全修飾クラス名 | Pulsar 管理クライアントの認証プラグインの完全修飾クラス名。 |
pulsar.admin.authParams |
None | 資格情報文字列 | Pulsar 管理者クライアント認証プラグインの認証資格情報。 |
pulsar.admin.useTls |
None |
true、false |
Pulsar 管理者クライアント接続に TLS を使用するかどうか。 |
pulsar.admin.tlsAllowInsecureConnection |
None |
true、false |
Pulsar 管理クライアントの安全でない TLS 接続を許可するかどうか。 |
pulsar.admin.tlsTrustCertsFilePath |
None | ファイル パス文字列 | Pulsar 管理クライアントの信頼された TLS 証明書ファイルへのパス。 |
pulsar.admin.useKeyStoreTls |
None |
true、false |
Pulsar 管理クライアントに KeyStore ベースの TLS を使用するかどうか。 |
pulsar.admin.tlsTrustStoreType |
None | 任意の文字列 | Pulsar 管理クライアントの TLS 信頼ストアの形式。 たとえば、「 JKS 」のように入力します。 |
pulsar.admin.tlsTrustStorePath |
None | ファイル パス文字列 | Pulsar 管理クライアントの TLS 信頼ストア ファイルへのパス。
pulsar.admin.useKeyStoreTls が true の場合に必要です。 |
pulsar.admin.tlsTrustStorePassword |
None | 任意の文字列 | Pulsar 管理者クライアント TLS 信頼ストアのパスワード。 |
認証の例については、「 Pulsar に対する認証」を参照してください。
DataFrameWriter オプション
これらのオプションを DataFrameWriter.option() および DataFrameWriterV2.option() と共に使用して、データAzure Databricks書き込む方法を制御します。
Example
次の例では、Delta Lake テーブルを書き込むためのmergeSchemaにTrueを設定します。
Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
Scala
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
アブロ
Avro ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
avroSchema |
None | JSON スキーマ文字列 | JSON 文字列としての完全な Avro スキーマ。 Spark SQL 型を特定の Avro 型に変換するには、このオプションを使用します。 Avro ファイルの読み取りと書き込みに適用されます。 |
avroSchemaUrl |
None | URL 文字列 | Avro スキーマ ファイルを指す URL。 スキーマが外部に格納されている場合は、 avroSchema の代わりに使用します。
avroSchemaと相互に排他的です。
Avro ファイルの読み取りと書き込みに適用されます。 |
compression |
snappy |
uncompressed、 deflate、 snappy (default)、 bzip2、 xz、 zstandard |
書き込み時に使用する圧縮コーデック。 Avro ファイルの読み取りと書き込みに適用されます。 |
recordName |
topLevelRecord |
任意の文字列 | 出力 Avro スキーマの最上位レベルのレコード名。 Avro ファイルの読み取りと書き込みに適用されます。 |
positionalFieldMatching |
false |
true、false |
Spark スキーマと Avro スキーマの間の列を名前ではなくフィールドの位置で照合するかどうか。 Avro ファイルの読み取りと書き込みに適用されます。 |
recordNamespace |
空の文字列 | 任意の文字列 | 出力 Avro スキーマの最上位レコードの名前空間。 Avro ファイルの読み取りと書き込みに適用されます。 |
Delta Lake と Apache Iceberg
Delta Lake テーブルと Apache Iceberg テーブルを記述する場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
clusterByAuto |
false |
true、false |
自動液体クラスタリングを有効にするかどうか。Azure Databricksクエリ パターンに基づいてクラスタリング列が選択されます。
mode("overwrite")でのみ有効です。
append モードでは使用できません。 Databricks Runtime 16.4 以降で使用できます。
[テーブルに液体クラスタリングを使用する] に適用されます。 |
mergeSchema |
None |
true、false |
書き込み操作でスキーマの進化を有効にするかどうかを指定します。 ソース DataFrame の新しい列がターゲット テーブル スキーマに追加されます。 バッチおよびストリーミングの追加に適用されます。 スキーマ の進化に伴うテーブル スキーマの更新に適用されます。 |
overwriteSchema |
None |
true、false |
上書き時にテーブル スキーマとパーティション分割を置き換えるかどうか。
mode("overwrite")なしでreplaceWhereが必要です。
partitionOverwriteModeでは使用できません。 スキーマ の進化に伴うテーブル スキーマの更新に適用されます。 |
partitionOverwriteMode |
None |
static、dynamic |
パーティション上書きモード。 これを dynamic に設定すると、新しいデータを含むパーティションのみが上書きされ、他のすべてのパーティションは変更されません。 レガシ モード。サーバーレス コンピューティングまたは Databricks SQL ではサポートされていません。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
replaceOn |
None | ブール式の文字列 | ソース クエリの行に置き換えるターゲット テーブル内の行と一致するブール式。 ターゲット テーブルとソース クエリの両方の列を参照できます。 ソース行と一致するターゲット内の行は削除され、置き換えられます。 ソースが空の場合、削除は行われません。 列参照のあいまいさを解消するには、 targetAlias を使用します。 Databricks Runtime 17.1 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
replaceUsing |
None | 列名のコンマ区切りリスト | ターゲット テーブルとソース クエリの間の行を照合するために使用される列名のコンマ区切りのリスト。 ターゲットとソースの両方に、一覧表示されているすべての列が含まれている必要があります。 等値比較でソース行と一致するターゲット内の行は削除され、置き換えられます。
NULL 値は等しくないとして扱われ、一致しません。 Databricks Runtime 16.3 以降で使用できます。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
replaceWhere |
None | 述語式の文字列 | 述語式。 述語に一致するレコードのみをアトミックに上書きします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
targetAlias |
None | 任意の文字列 | ターゲット テーブルの文字列エイリアス。 条件がターゲット テーブルとソース クエリの両方の列を参照する場合に、 replaceOn または replaceWhere を使用して列参照を明確にします。 Delta Lake を使用して データを選択的に上書きする場合に適用されます。 |
txnAppId |
None | 任意の文字列 |
foreachBatch操作でのべき等書き込みのアプリケーションを識別する一意の文字列。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnVersion と共に使用します。
べき等テーブル書き込みにforeachBatchを使用するために適用されます。 |
txnVersion |
None | 単調に増加する整数 |
foreachBatch操作でのべき等書き込みのトランザクション バージョンとして使用される単調に増加する数。 複数の Delta Lake テーブルへの正確な 1 回の書き込みを保証するには、 txnAppId と共に使用します。
べき等テーブル書き込みにforeachBatchを使用するために適用されます。 |
optimizeWrite |
None |
true、false |
この書き込み操作で自動最適化書き込みを有効にするかどうかを指定します。
spark.databricks.delta.optimizeWrite.enabled構成をオーバーライドします。
Azure Databricks?の Delta Lake に適用されます。 |
userMetadata |
None | 任意の文字列 | 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。
DESCRIBE HISTORYの出力に表示されます。
カスタム メタデータを使用したテーブルのエンリッチに適用されます。 |
Csv
CSV ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
charToEscapeQuoteEscaping |
\0 (無効) |
1 文字 | エスケープ文字が引用符文字と異なる場合にエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。 |
compression |
none |
none (default)、bzip2、gzip、lz4、snappy、deflate、zstd |
書き込み時に使用する圧縮コーデック。 csv (DataFrameWriter) に適用されます。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 | 日付列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。 |
emptyValue |
空の文字列 | 任意の文字列 | 空の (null 以外の) 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
java.nio.charset.Charset名 |
出力ファイルの文字エンコード。 csv (DataFrameWriter) に適用されます。 |
escape |
\ |
1 文字 | 引用符で囲まれた値をエスケープするために使用される文字。 csv (DataFrameWriter) に適用されます。 |
escapeQuotes |
true |
true、false |
引用符で囲まれたフィールド値内の引用符文字をエスケープするかどうか。 csv (DataFrameWriter) に適用されます。 |
header |
false |
true、false |
出力の最初の行として列名を書き込むかどうか。 csv (DataFrameWriter) に適用されます。 |
ignoreLeadingWhiteSpace |
false |
true、false |
書き込み時に先頭の空白を値からトリミングするかどうか。 csv (DataFrameWriter) に適用されます。 |
ignoreTrailingWhiteSpace |
false |
true、false |
書き込み時に値から末尾の空白をトリミングするかどうかを指定します。 csv (DataFrameWriter) に適用されます。 |
lineSep |
\n |
文字列 | レコード間で使用される行区切り文字列。 csv (DataFrameWriter) に適用されます。 |
locale |
en-US |
java.util.Locale識別子 |
java.util.Locale 識別子。 CSV 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与える、識別されたJavaロケール。 |
nullValue |
空の文字列 | 任意の文字列 | null 値に対して書き込まれた文字列。 csv (DataFrameWriter) に適用されます。 |
quote |
" |
1 文字 | 区切り記号を含むフィールド値を引用符で囲む文字。 csv (DataFrameWriter) に適用されます。 |
quoteAll |
false |
true、false |
内容に関係なく、すべてのフィールド値を引用符で囲むかどうか。 csv (DataFrameWriter) に適用されます。 |
sep |
, |
文字列 | フィールド区切り文字。 csv (DataFrameWriter) に適用されます。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ形式の文字列 | タイムスタンプ列の値の書式指定文字列。 csv (DataFrameWriter) に適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。 |
Excel
Excel ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
dataAddress |
None | シート名またはセル参照文字列 | 書き込みのシート名または開始セル。 省略した場合、セル Sheet1から始まるA1という名前のシートに書き込みます。 シート名 (SheetName) または単一セル参照 (SheetName!A1) を受け入れます。 セル範囲は書き込みではサポートされていません。 |
dateFormatInWrite |
yyyy-mm-dd |
Excel日付書式指定文字列 | Excel c0 /< > 列に適用されるセル書式指定文字列です。 Excel書式構文を使用します。 |
headerRows |
0 |
0、1 |
列名を最初の行として書き込むかどうか。 |
timestampNTZFormat |
yyyy-mm-dd hh:mm:ss |
Excel タイムスタンプ形式の文字列 | Excel TimestampNTZ および Timestamp 列に適用されるセル書式指定文字列。 Excel書式構文を使用します。 |
version |
xlsx |
xlsx、xls |
書き込むExcelファイル形式のバージョン。 |
Json
JSON ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
compression |
none |
none、bzip2、gzip、lz4、snappy、deflate、zstd |
書き込み時に使用する圧縮コーデック。 json (DataFrameWriter) に適用されます。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 | 日付列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
java.nio.charset.Charset名 |
出力ファイルの文字エンコード。 json (DataFrameWriter) に適用されます。 |
ignoreNullFields |
の値 spark.sql.jsonGenerator.ignoreNullFields |
true、false |
JSON 出力から null 値を持つフィールドを省略するかどうか。 json (DataFrameWriter) に適用されます。 |
lineSep |
\n |
文字列 | レコード間で使用される行区切り文字列。 json (DataFrameWriter) に適用されます。 |
locale |
en-US |
java.util.Locale識別子 |
JSON 内の既定の日付、タイムスタンプ、および 10 進解析に影響を与えるJavaロケール識別子。 |
pretty |
false |
true、false |
美しい (インデントされた複数行の) JSON 出力を有効にするかどうか。 |
sortKeys |
false |
true、false |
出力で JSON オブジェクトのキーをアルファベット順に並べ替えるかどうか。 確定的な出力を生成する場合に便利です。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ形式の文字列 | タイムスタンプ列の値の書式指定文字列。 json (DataFrameWriter) に適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | タイム ゾーン (TimestampNTZType) 列の値を含まないタイムスタンプの書式指定文字列。 |
writeNonAsciiCharacterAsCodePoint |
false |
true、false |
出力内のリテラル UTF-8 文字ではなく、非 ASCII 文字 \uXXXX Unicode エスケープ シーケンスとしてエンコードするかどうか。 |
オーク
ORC ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
compression |
zstd |
none、 uncompressed、 snappy、 zlib、 lzo、 zstd、 lz4、 brotli |
書き込み時に使用する圧縮コーデック。 orc (DataFrameWriter) に適用されます。 |
寄木細工
Parquet ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
compression |
snappy |
none、 uncompressed、 snappy、 gzip、 lzo、 brotli、 lz4、 lz4_raw、 zstd |
書き込み時に使用する圧縮コーデック。 Parquet (DataFrameWriter) に適用されます。 |
spark.sql.parquet.outputTimestampType |
INT96 |
INT96、TIMESTAMP_MICROS、TIMESTAMP_MILLIS |
タイムスタンプ列のエンコードに使用される物理型。 標準のタイムスタンプ型をサポートしていない従来の Parquet リーダーとの互換性を保つには、 INT96 を使用します。 |
テキスト
テキスト ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
compression |
none |
none、bzip2、gzip、lz4、snappy、deflate、zstd |
書き込み時に使用する圧縮コーデック。 テキスト ( DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
java.nio.charset.Charset名 |
出力ファイルの文字エンコード。 |
lineSep |
\n |
文字列 | レコード間で使用される行区切り文字列。 テキスト ( DataFrameWriter) に適用されます。 |
Xml
XML ファイルを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
arrayElementName |
item |
任意の文字列 | 明示的な名前を持たない配列要素の要素名。 xml (DataFrameWriter) に適用されます。 |
attributePrefix |
_ |
任意の文字列 | XML 属性に対応するフィールド名の前に付加されるプレフィックス。 xml (DataFrameWriter) に適用されます。 |
compression |
none |
none、bzip2、gzip、lz4、snappy、deflate、zstd |
書き込み時に使用する圧縮コーデック。 xml (DataFrameWriter) に適用されます。 |
dateFormat |
yyyy-MM-dd |
日付書式指定文字列 | 日付列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。 |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
XML 宣言文字列、または抑制する空の文字列 | 各出力ファイルの先頭に書き込まれた XML 宣言文字列。 宣言を抑制する空の文字列に設定します。 xml (DataFrameWriter) に適用されます。 |
encoding |
UTF-8 |
java.nio.charset.Charset名 |
出力ファイルの文字エンコード。 xml (DataFrameWriter) に適用されます。 |
indent |
4 スペース | 任意の文字列 | 出力内の子要素のインデントに使用される文字列。 インデントをオフにし、各行を 1 行に書き込むには、空の文字列に設定します。 |
locale |
en-US |
java.util.Locale識別子 |
XML 内の既定の日付、タイムスタンプ、および 10 進形式に影響を与えるJavaロケール識別子。 |
nullValue |
null |
任意の文字列 | null 値に対して書き込まれた文字列。
nullに設定すると、null フィールドの属性と子要素は省略されます。
xml (DataFrameWriter) に適用されます。 |
rootTag |
ROWS |
任意の文字列 | 出力内のすべての行要素をラップするルート要素タグ。 xml (DataFrameWriter) に適用されます。 |
rowTag |
ROW |
任意の文字列 | 出力内の行を表す要素タグ。 xml (DataFrameWriter) に適用されます。 |
singleVariantColumn |
None | 列名の文字列 | XML ファイルに書き込む 1 つの Variant 列の名前。 xml (DataFrameWriter) に適用されます。 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
タイムスタンプ形式の文字列 | タイムスタンプ列の値の書式指定文字列。 xml (DataFrameWriter) に適用されます。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] |
タイムスタンプ形式の文字列 | タイム ゾーン列の値を含まないタイムスタンプの書式指定文字列。 xml (DataFrameWriter) に適用されます。 |
validateName |
true |
true、false |
列名が有効な XML 要素識別子でない場合に例外をスローするかどうか。 xml (DataFrameWriter) に適用されます。 |
valueTag |
_VALUE |
任意の文字列 | 属性または子要素を持つ XML 要素の文字データに使用されるフィールド名。 xml (DataFrameWriter) に適用されます。 |
DataStreamWriter オプション
ストリーミング書き込みを構成するには、これらのオプションと DataStreamWriter.option() を使用します。
Example
次の例では、ストリームのチェックポイントの場所を設定します。
Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
Scala
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
共通
次のオプションは、すべてのストリーミング書き込み操作に適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
checkpointLocation |
なし (必須) | パス文字列 | ストリーミング クエリのチェックポイント ディレクトリへのパス。 フォールト トレランスと正確に 1 回の処理の保証に必要です。 各ストリーミング クエリでは、一意のチェックポイントの場所を使用する必要があります。 Databricks では、Unity カタログ ボリュームまたはクラウド ストレージ パスにチェックポイントを格納することをお勧めします。 「構造化ストリーミング チェックポイント」を参照してください。 |
path |
None | パス文字列 | Parquet などのファイル ベースのストリーミング シンクの出力パス。 ファイル ベースの形式にのみ適用されます。 |
コンソール シンク
コンソール シンクにストリームを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
numRows |
20 |
正の整数 | コンソール シンクに書き込むときにマイクロバッチごとに表示する行数。 |
truncate |
true |
true、false |
行を表示するときに長い文字列を切り捨てるかどうかを指定します。 完全な文字列値を表示するには、 false に設定します。 |
Delta Lake
format("delta")を使用して Delta Lake テーブルにストリームを書き込む場合は、次のオプションが適用されます。
overwriteSchema、replaceWhere、partitionOverwriteModeなどの上書き専用オプションは、ストリーミング書き込みではサポートされていません。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
mergeSchema |
false |
true、false |
ストリーミング DataFrame に新しい列が含まれている場合に Delta Lake テーブル スキーマを進化させるかどうか。 追加出力モードにのみ適用されます。 スキーマ の進化に伴うテーブル スキーマの更新に適用されます。 |
userMetadata |
None | 任意の文字列 | 書き込み操作のコミット メタデータに追加されたユーザー定義文字列。
DESCRIBE HISTORYの出力に表示されます。
カスタム メタデータを使用したテーブルのエンリッチに適用されます。 |
ファイル シンク
次のオプションは、ストリームをファイル ベースの形式 (Parquet、JSON、CSV、ORC、text) に書き込む場合に適用されます。 形式固有のオプションについては、「 DataFrameWriter オプション」を参照してください。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
retention |
None |
7 daysや24 hours |
フォールト トレランスと圧縮に使用されるシンク メタデータ ファイルを保持する期間。 設定しない場合、メタデータ ファイルは無期限に保持されます。 |
Kafka シンク
Kafka に書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
kafka.bootstrap.servers |
None |
host:port文字列のコンマ区切りリスト |
必須。 Kafka ブローカー host:port アドレスのコンマ区切りのリスト。 |
topic |
None | 任意の文字列 | すべての行のターゲット Kafka トピック。 DataFrame に topic 列が含まれていない場合は必須です。 |
kafka.* |
None | Kafka プロデューサーの構成値 |
プレフィックスが付いた kafka.。 たとえば、「 kafka.compression.type 」のように入力します。 |
メモリ シンク
メモリ シンクにストリームを書き込む場合は、次のオプションが適用されます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
queryName |
なし (必須) | 任意の文字列 | クエリが書き込むメモリ内テーブルの名前。 メモリ シンクに必要です。
.queryName()を介して構成することもできます。 |
mode |
exactlyonce |
exactlyonce、atleastonce |
メモリ シンクの配信保証。
exactlyonce は、厳密に 1 回のセマンティクスを持つマイクロバッチ モードを使用します。
atleastonce は、少なくとも 1 回のセマンティクスを持つ連続モードを使用します。 |
Spark 関数のオプション
一部の Spark SQL 組み込み関数は、解析またはシリアル化の動作を制御する options マップを受け入れます。 オプションを Python dict または Scala Map[String, String] として渡します。
Example
次の例では、形式が正しくないレコードを削除しながら JSON 列を解析します。
Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
アブロ
Avro 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_avroschema_of_avroを使用。 -
to_avroでは 、DataFrameWriter Avro オプションが使用されます。
Example
次の例では、スキーマの進化が有効になっている Avro 列をデコードします。
Python
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
Scala
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
さらに、 from_avro と to_avro のスキーマ レジストリバリアントでは、次のオプションを受け入れます。
| Key | デフォルト | 有効な値 | Description |
|---|---|---|---|
schemaId |
None | スキーマ ID の整数 |
jsonFormatSchemaと互換性のないスキーマでエンコードされた Avro データをデコードするときに使用する Confluent スキーマ レジストリのスキーマ ID。
from_avroにのみ適用されます。 |
confluent.schema.registry.* |
None | Confluent SR クライアント プロパティ値 | Confluent Schema Registry クライアント構成プロパティ。 基本認証資格情報の confluent.schema.registry.basic.auth.user.info など、このプレフィックスを使用して Confluent SR クライアント プロパティを渡します。
from_avroとto_avroのスキーマ レジストリバリアントに必要です。 |
Csv
CSV 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_csvschema_of_csvを使用。 -
to_csvでは 、DataFrameWriter CSV オプションが使用されます。
Example
次の例では、カスタム区切り記号と NULL 値を含む CSV を読み取ります。
Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
Scala
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
Json
JSON 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_jsonschema_of_jsonを使用。 -
to_jsonでは 、DataFrameWriter JSON オプションが使用されます。
Example
次の例では、 NULL フィールドが無視され、整形書式設定が有効になっている JSON を書き込みます。
Python
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
Scala
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf と to_protobuf はファイルベースのデータソースを使用しません。 Protobuf データは、これらの関数を使用して常にバイナリ列として読み書きされます。 オプションは Map[String, String] として渡され、大文字と小文字が区別されます。
Example
次の例では、PERMISSIVE モードを使用して Protobuf 列をデコードします。
Python
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
Scala
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf 関数では、次のオプションを使用します。
Xml
XML 関数は、対応する DataFrame オプションと同じオプションを受け入れます。
-
from_xmlschema_of_xmlを使用。 -
to_xmlでは 、DataFrameWriter XML オプションが使用されます。
Example
次の例では、カスタム ルートタグと行タグを使用して XML を書き込みます。
Python
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
Scala
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))