XML ファイルの読み取りと書き込み

重要

この機能はパブリック プレビュー段階にあります。

拡張マークアップ言語 (XML) は、テキスト形式でデータを書式設定、格納、共有するためのマークアップ言語です。 この言語では、ドキュメントから任意のデータ構造まで、さまざまなデータをシリアル化するための一連のルールが定義されます。

Azure Databricksでは、Apache Spark での読み取りと書き込みの両方に XML がサポートされています。これには、スキーマの自動推論と進化、行タグの構成、XSD 検証、from_xmlなどの SQL 式が含まれます。 ネイティブ XML サポートは、外部 jar を必要とせずに、自動ローダー、 read_files、および COPY INTO で動作します。

前提条件

XML ファイル形式のサポートには、Databricks Runtime 14.3 以降が必要です。

オプション

.option().options()DataFrameReader メソッドおよび DataFrameWriter メソッドを使用して、XML データ ソースを構成します。 サポートされているオプションの完全な一覧については、DataFrameReader XML オプション および DataFrameWriter XML オプション を参照してください。

XML レコードを解析する

XML の仕様では、整形式の構造が義務付けられています。 ただし、この仕様はすぐに表形式にマップされるわけではありません。 rowTag DataFrame にマップする XML 要素を示すには、Row オプションを指定する必要があります。 rowTag 要素は最上位レベルの struct になります。 rowTag の子要素は最上位レベルの struct のフィールドになります。

このレコードのスキーマを指定することも、自動的に推論させることもできます。 パーサーは rowTag 要素のみ調査するため、DTD と外部エンティティはフィルター処理されません。

次の例は、さまざまな rowTag オプションを使用した XML ファイルのスキーマ推論と解析を示しています。

Python

xmlString = """
  <reviews>
    <review id="r001">
      <author>Alice</author>
      <rating>5</rating>
      <comment>Amazing stay, highly recommend!</comment>
    </review>
    <review id="r002">
      <author>Bob</author>
      <rating>4</rating>
      <comment>Great location, very comfortable</comment>
    </review>
  </reviews>"""

xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)

スカラ (プログラミング言語)

val xmlString = """
  <reviews>
    <review id="r001">
      <author>Alice</author>
      <rating>5</rating>
      <comment>Amazing stay, highly recommend!</comment>
    </review>
    <review id="r002">
      <author>Bob</author>
      <rating>4</rating>
      <comment>Great location, very comfortable</comment>
    </review>
  </reviews>"""
val xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString)

rowTagとして、"reviews" オプションを使用して XML ファイルを読み取る:

Python

df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

スカラ (プログラミング言語)

val df = spark.read.option("rowTag", "reviews").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
  format => 'xml',
  rowTag => 'reviews'
)

出力:

root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)

+----------------------------------------------------------------------------------------+
|review                                                                                  |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+

rowTagとして"review"を含む XML ファイルを読み取る:

Python

df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:

スカラ (プログラミング言語)

val df = spark.read.option("rowTag", "review").xml(xmlPath)
// Infers four top-level fields and parses `review` in separate rows:

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
  format => 'xml',
  rowTag => 'review'
)

出力:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)

+----+------+--------------------------------+------+
|_id |author|comment                         |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5     |
|r002|Bob   |Great location, very comfortable|4     |
+----+------+--------------------------------+------+

XSD を使用して XML レコードを検証する

オプションで、XML スキーマ定義 (XSD) によって各行レベルの XML レコードを検証できます。 XSD ファイルは rowValidationXSDPath オプションで指定されます。 XSD は、指定された、または推定されたスキーマにそれ以外には影響しません。 検証に失敗したレコードは "破損" としてマークされ、オプション セクションで説明されている破損したレコード処理モード オプションに基づいて処理されます。

XSDToSchema を使用して、XSD ファイルから Spark DataFrame スキーマを抽出します。 これは単純型、複合型、シーケンス型のみと、基本的な XSD 機能のみをサポートしています。

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="review">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="rating" type="xs:integer" />
          <xs:element name="comment" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

次の表は、XSD データ型から Sparkデータ型への変換を示しています。

XSD データ型 Spark データ型
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
shortunsignedByte ShortType
integernegativeIntegernonNegativeIntegernonPositiveIntegerpositiveIntegerunsignedShort IntegerType
longunsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

入れ子になった XML を解析する

schema_of_xmlfrom_xml を使用して既存の DataFrame 内の文字列値列の XML データを解析することで、スキーマと解析結果を新しい struct 列として返すことができます。 schema_of_xmlfrom_xml への引数として渡される XML データは、1 つの整形式の XML レコードである必要があります。

XMLのスキーマ

schema_of_xmlを使用して、XML 文字列から Spark スキーマを推論します。 結果を from_xml に渡して XML 列を解析します。

構文: schema_of_xml(xmlStr [, options])

引数 Required 説明
xmlStr Yes 単一の整形式 XML レコードを指定する STRING 式。
options No ディレクティブを指定する MAP<STRING,STRING> リテラル。

XML 要素と属性名から列名が派生する文字列の n 個のフィールドを持つ構造体の定義を保持する STRING を返します。 フィールドの値は、派生した書式付き SQL 型を保持します。

from_xml

from_xmlを使用して、XML レコードを含む STRING 列を構造体に解析します。 スキーマを直接指定するか、 schema_of_xmlの出力を使用します。

構文: from_xml(xmlStr, schema [, options])

引数 Required 説明
xmlStr Yes 単一の整形式 XML レコードを指定する STRING 式。
schema Yes schema_of_xml関数の STRING 式または呼び出し。
options No ディレクティブを指定するための MAP<STRING,STRING> リテラル。

スキーマ定義に一致するフィールド名と型を持つ構造体を返します。 スキーマは、コンマで区切られた列名とデータ型のペアとして定義する必要があります (例: CREATE TABLE)。 [オプション] セクションに表示されるほとんどのオプションは、次の例外を除いて適用できます。

  • rowTag: XML レコードが 1 つしかないため、rowTag オプションは適用されません。
  • mode(既定値: PERMISSIVE): 解析中に破損したレコードを処理するモードを許可します。
    • PERMISSIVE: 破損したレコードに合致する場合は、columnNameOfCorruptRecord によって構成されたフィールドに形式に誤りがある文字列を格納し、形式に誤りがあるフィールドを null に設定します。 破損したレコードを保持するには、ユーザー定義スキーマで columnNameOfCorruptRecord という名前の文字列型フィールドを設定できます。 スキーマにフィールドがない場合、破損したレコードが解析中に削除されます。 スキーマを推論すると、出力スキーマに columnNameOfCorruptRecord フィールドが暗黙的に追加されます。
    • FAILFAST: 破損したレコードに合致する場合に、例外をスローします。

XML 文字列列を解析するには、 schema_of_xml を使用してスキーマを推論し、それを from_xmlに渡します。

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>
"""

df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

スカラ (プログラミング言語)

import org.apache.spark.sql.functions.{from_xml, schema_of_xml, lit}

val xmlData = """
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>""".stripMargin

val df = Seq((1, xmlData)).toDF("review_id", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

SQL でインライン XML を解析するには:

SELECT from_xml('
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>',
  schema_of_xml('
  <review id="r001">
    <author>Alice</author>
    <rating>5</rating>
    <comment>Amazing stay, highly recommend!</comment>
  </review>')
);

XML 構造体と DataFrame 構造体の間で変換する

DataFrame と XML の構造上の違いにより、XML データから DataFrame への変換規則と DataFrame から XML データへの変換規則がいくつか存在します。 オプション excludeAttribute を含む場合は、属性の処理を無効にできることに注意してください。

XML から DataFrame への変換

XML を読み取る場合、Azure Databricksは、次の規則に従って XML 要素と属性を DataFrame フィールドにマップします。

属性は、見出しプレフィックス attributePrefixを持つフィールドとして変換されます。

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

これにより、次のスキーマが生成されます。

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

属性または子要素を含む要素内の文字データは、 valueTag フィールドに解析されます。 文字データが複数ある場合、valueTag フィールドは array 型に変換されます。

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

これにより、次のスキーマが生成されます。

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _myTwoAttrib: string (nullable = true)
 |-- three: string (nullable = true)

DataFrame から XML への変換

データフレームを XML に書き込む場合、入れ子になった特定の構造体では、DataFrame データ モデルと XML データ モデルの違いにより、特別な処理が必要になります。

要素型もArrayTypeArrayTypeフィールドが DataFrame に含まれている場合、XML に書き込むと、XML ファイルをラウンドトリップするときに存在しない追加の入れ子レベルが生成されます。 これは、XML の外部からソース化された DataFrame にのみ影響します。XML ファイルの読み取りと書き込みは元の構造を保持します。

たとえば、次のスキーマを持つ DataFrame です。

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

および次のデータ:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

では、次の XML 出力が生成されます。

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

DataFrame 内の名前のない配列の要素名はオプション arrayElementName で指定されます (既定値: item)。

復旧されたデータ列を有効にする

復旧されたデータ列により、ETL 中にデータが失われることはありません。 レコード内の 1 つ以上のフィールドに次のいずれかの問題があるため、解析されなかったデータがキャプチャされます。

  • 指定されたスキーマに存在しない。
  • 指定されたスキーマのデータ型と一致しない。
  • 指定されたスキーマのフィールド名と大文字と小文字の区別が一致しない。

復旧されたデータ列は、復旧された列と、レコードのソース ファイル パスを含む JSON ドキュメントとして返されます。

復旧されたデータ列を有効にするには、読み取り時に rescuedDataColumn オプションを列名に設定します。

Python

df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")

スカラ (プログラミング言語)

val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_xml',
  format => 'xml',
  rowTag => 'review',
  rescuedDataColumn => '_rescued_data'
)

復旧されたデータ列からソース ファイルのパスを削除するには、次の値を設定します。

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

XML パーサーでは、レコードを解析するときに、PERMISSIVEDROPMALFORMEDFAILFAST の 3 つのモードがサポートされます。 rescuedDataColumn と併用した場合、データ型の不一致が原因で、DROPMALFORMED モードでレコードが破棄されたり、FAILFAST モードでエラーが発生したりすることはありません。 破損レコード(不完全または不正な形式の XML)のみが、除外されるか、エラーになります。

自動ローダーを使用してスキーマを推論および進化させる

このトピックと適用可能なオプションの詳細については、「自動ローダーでのスキーマの推論と展開の構成」を参照してください。 読み込まれた XML データのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。

既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化の問題を回避します。 データ型 (JSON、CSV、XML) をエンコードしない形式の場合、自動ローダーは XML ファイル内の入れ子になったフィールドを含め、すべての列を文字列として推論します。 Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて XML ソースの列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypestrue に設定します。

自動ローダーでは、データを処理する際に新しい列の追加を検出します。 自動ローダーで新しい列が検出されると、ストリームが UnknownFieldException で停止します。 ストリームでこのエラーが発生する前に、Auto Loader はデータの最新のマイクロバッチに対してスキーマ推論を実行し、新しい列をスキーマの末尾に追加してマージすることで、スキーマ保存場所を最新のスキーマで更新します。 既存の列のデータ型は変更されません。 自動ローダーでは、スキーマの展開に関するさまざまなモードがサポートされています。これは、cloudFiles.schemaEvolutionMode オプションで設定できます。

スキーマ ヒントを使用して、推論されたスキーマに対して知っているか予想されるスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合や、さらに一般的なデータ型 (たとえば、整数ではなく浮動小数点を選択する場合は、SQL スキーマ仕様構文を使用して文字列として、列のデータ型に任意の数のヒントを指定できます。 復旧されたデータ列が有効になっているときは、スキーマの大文字または小文字とは異なる名前が付けられたフィールドが _rescued_data 列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitivefalse に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。

Usage

次の例では 、Wanderbricks データセット を使用して、Spark DataFrame API と SQL を使用した XML ファイルの読み取りと書き込みを示します。

XML の読み取りと書き込み

DataFrame API を使用して Wanderbricks レビューを XML に書き込み、読み戻します。

Python

# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
  .format("xml") \
  .option("rootTag", "reviews") \
  .option("rowTag", "review") \
  .save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")

# Read the XML file back
df_read = spark.read \
  .format("xml") \
  .option("rowTag", "review") \
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()

スカラ (プログラミング言語)

// Write Wanderbricks reviews to XML
val df = spark.read.table("samples.wanderbricks.reviews")
df.write
  .format("xml")
  .option("rootTag", "reviews")
  .option("rowTag", "review")
  .save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")

// Read the XML file back
val dfRead = spark.read
  .format("xml")
  .option("rowTag", "review")
  .xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
dfRead.show()

R

df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")

データを読み取る際に、スキーマを手動で指定できます:

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("rating", IntegerType(), True),
    StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()

スカラ (プログラミング言語)

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true)))
val df = spark.read.option("rowTag", "review").schema(customSchema).xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.show()

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("rating", "integer"),
  structField("comment", "string"))

df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", schema = customSchema, rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")

SQL を使用した XML の読み取りと書き込み

SQL DDL を使用して、XML ファイルからテーブルを作成します。 Azure Databricksは、列の型を自動的に推論します。

DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;

DDL で列名と型を指定することもできます。 この場合、スキーマは自動的に推論されません。

DROP TABLE IF EXISTS reviews;

CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");

COPY INTO を使用して XML を読み込む

COPY INTOを使用して、クラウド ストレージから Delta テーブルに XML ファイルを読み込みます。

DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;

COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');

行の検証を使用した XML の読み取り

rowValidationXSDPath オプションを使用して、読み取り中に XSD スキーマに対して各行を検証します。

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "review")
    .option("rowValidationXSDPath", xsdPath)
    .load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()

スカラ (プログラミング言語)

val df = spark.read
  .option("rowTag", "review")
  .option("rowValidationXSDPath", xsdPath)
  .xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.printSchema

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
  format => 'xml',
  rowTag => 'review',
  rowValidationXSDPath => '/Volumes/<catalog>/<schema>/<volume>/reviews.xsd'
)

自動ローダーを使用した XML の読み込み

自動ローダーを使用して、自動スキーマ推論と進化を使用して、クラウド ストレージから Delta テーブルに XML ファイルを継続的に取り込みます。

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "review")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("reviews")
)

スカラ (プログラミング言語)

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "review")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())
  .toTable("reviews")

その他のリソース