Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esse recurso está em uma versão prévia.
XML (Extensible Markup Language ) é uma linguagem de marcação para formatação, armazenamento e compartilhamento de dados em formato textual. Ela define um conjunto de regras para serializar dados que variam de documentos a estruturas de dados arbitrárias.
Azure Databricks dá suporte a XML para leitura e gravação com Apache Spark, incluindo inferência e evolução automáticas de esquema, configuração de marca de linha, validação XSD e expressões SQL como from_xml. O suporte nativo a XML funciona com o Auto Loader, read_files e COPY INTO, sem exigir arquivos JAR externos.
Pré-requisitos
O suporte ao formato de arquivo XML requer o Databricks Runtime 14.3 e superior.
Opções
Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar fontes de dados XML. Para obter uma lista completa de opções com suporte, consulte DataFrameReader opções XML e DataFrameWriter opções XML.
Analisar registros XML
A especificação do XML exige uma estrutura bem formada. No entanto, essa especificação não é mapeada imediatamente para um formato tabular. Você deve especificar a opção rowTag para indicar o elemento XML que mapeia para um DataFrameRow. O elemento rowTag torna-se o struct de nível superior. Os elementos filhos de rowTag tornam-se os campos do struct de nível superior.
É possível especificar o esquema para esse registro ou permitir que ele seja inferido automaticamente. Como o analisador examina apenas os elementos rowTag, a DTD e as entidades externas são filtradas.
Os exemplos a seguir ilustram a inferência de esquema e a análise de um arquivo XML usando diferentes opções rowTag:
Python
xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala (linguagem de programação)
val xmlString = """
<reviews>
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
<review id="r002">
<author>Bob</author>
<rating>4</rating>
<comment>Great location, very comfortable</comment>
</review>
</reviews>"""
val xmlPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
dbutils.fs.put(xmlPath, xmlString)
Leia o arquivo XML usando a opção rowTag como "reviews":
Python
df = spark.read.option("rowTag", "reviews").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala (linguagem de programação)
val df = spark.read.option("rowTag", "reviews").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'reviews'
)
Saída:
root
|-- review: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- comment: string (nullable = true)
| | |-- rating: string (nullable = true)
+----------------------------------------------------------------------------------------+
|review |
+----------------------------------------------------------------------------------------+
|[{r001, Alice, Amazing stay, highly recommend!, 5}, {r002, Bob, Great location..., 4}] |
+----------------------------------------------------------------------------------------+
Leia o arquivo XML com rowTag como "review":
Python
df = spark.read.option("rowTag", "review").format("xml").load(xmlPath)
# Infers four top-level fields and parses `review` in separate rows:
Scala (linguagem de programação)
val df = spark.read.option("rowTag", "review").xml(xmlPath)
// Infers four top-level fields and parses `review` in separate rows:
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review'
)
Saída:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- comment: string (nullable = true)
|-- rating: string (nullable = true)
+----+------+--------------------------------+------+
|_id |author|comment |rating|
+----+------+--------------------------------+------+
|r001|Alice |Amazing stay, highly recommend! |5 |
|r002|Bob |Great location, very comfortable|4 |
+----+------+--------------------------------+------+
Validar registros XML com XSD
Opcionalmente, é possível validar cada registro XML em nível de linha por meio de uma Definição de Esquema XML (XSD). O arquivo XSD é especificado na opção rowValidationXSDPath. O XSD não afeta de outra forma o esquema fornecido ou inferido. Um registro que falha na validação é marcado como "corrompido" e manipulado com base na opção de modo de tratamento de registro corrompido descrita na seção de opção.
Você pode usar XSDToSchema para extrair um esquema DataFrame do Spark de um arquivo XSD. Ele tem suporte apenas para tipos simples, complexos e sequenciais, e dá suporte apenas para a funcionalidade básica do XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "/Volumes/<catalog>/<schema>/<volume>/reviews.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="review">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="rating" type="xs:integer" />
<xs:element name="comment" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
A tabela a seguir mostra a conversão de tipos de dados XSD em tipos de dados do Spark:
| Tipos de dados XSD | Tipos de dados do Spark |
|---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short, unsignedByte |
ShortType |
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, , positiveIntegerunsignedShort |
IntegerType |
long, unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Analisar XML aninhado
Os dados XML em uma coluna com valor de cadeia de caracteres em um DataFrame existente podem ser analisados com schema_of_xml e from_xml que retornam o esquema e os resultados analisados como novas colunas struct. Os dados XML passados como argumento para schema_of_xml e from_xml devem ser um único registro XML bem formado.
esquema_de_XML
Use schema_of_xml para inferir o esquema Spark de uma cadeia de caracteres XML. Passe o resultado para from_xml para que analise colunas XML.
Sintaxe: schema_of_xml(xmlStr [, options])
| Argumento | Required | Descrição |
|---|---|---|
xmlStr |
Yes | Uma expressão STRING que especifica um único registro XML bem formado. |
options |
Não | Um literal MAP<STRING,STRING> que especifica diretivas. |
Retorna um STRING que contém uma definição de um struct com n campos de cadeias de caracteres em que os nomes de coluna são derivados do elemento XML e dos nomes de atributo. Os valores dos campos contêm os tipos SQL formatados derivados.
from_xml
Use from_xml para analisar uma coluna STRING que contém registros XML em um struct. Forneça um esquema diretamente ou use a saída de schema_of_xml.
Sintaxe: from_xml(xmlStr, schema [, options])
| Argumento | Required | Descrição |
|---|---|---|
xmlStr |
Yes | Uma expressão STRING que especifica um único registro XML bem formado. |
schema |
Yes | Uma expressão STRING ou a invocação da função schema_of_xml. |
options |
Não | Um literal MAP<STRING,STRING> que especifica diretivas. |
Retorna um struct com nomes de campo e tipos que correspondem à definição de esquema. O esquema deve ser definido como pares de nomes de colunas e tipos de dados separados por vírgulas, como usado, por exemplo, em CREATE TABLE. A maioria das opções mostradas na seção Opções é aplicável com as seguintes exceções:
-
rowTag: como há apenas um registro XML, a opçãorowTagnão é aplicável. -
mode(padrão:PERMISSIVE): permite um modo para lidar com registros corrompidos durante a análise.-
PERMISSIVE: quando um registro corrompido e encontrado, a cadeia de caracteres malformada é colocada em um campo configurado porcolumnNameOfCorruptRecorde os campos malformados são definidos comonull. Para manter os registros corrompidos, é possível configurar um campo do tipo cadeia de caracteres chamadocolumnNameOfCorruptRecordem um esquema definido pelo usuário. Se o esquema não tiver o campo, ele removerá os registros corrompidos durante a análise. Ao inferir um esquema, ele adiciona implicitamente um campocolumnNameOfCorruptRecorda um esquema de saída. -
FAILFAST: lança uma exceção quando encontra registros corrompidos.
-
Exemplos
Para analisar uma coluna de cadeia de caracteres XML, use schema_of_xml para inferir o esquema e, em seguida, passá-lo para from_xml:
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>
"""
df = spark.createDataFrame([(1, xml_data)], ["review_id", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala (linguagem de programação)
import org.apache.spark.sql.functions.{from_xml, schema_of_xml, lit}
val xmlData = """
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>""".stripMargin
val df = Seq((1, xmlData)).toDF("review_id", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
Para analisar XML embutido no SQL:
SELECT from_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>',
schema_of_xml('
<review id="r001">
<author>Alice</author>
<rating>5</rating>
<comment>Amazing stay, highly recommend!</comment>
</review>')
);
Converter entre estruturas XML e DataFrame
Devido às diferenças de estrutura entre DataFrame e XML, existem algumas regras de conversão de dados XML para DataFrame e de DataFrame para dados XML. Observe que os atributos de tratamento podem ser desabilitados com a opção excludeAttribute.
Conversão de XML para DataFrame
Ao ler XML, Azure Databricks mapeia elementos XML e atributos para campos DataFrame de acordo com as regras a seguir.
Os atributos são convertidos em campos com o prefixo de título attributePrefix.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
Isso produz o seguinte esquema:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Os dados de caracteres em um elemento que contém atributo(s) ou elemento(s) filho(s) são interpretados no campo valueTag. Se houver várias ocorrências de dados de caracteres, o campo valueTag será convertido em um tipo array.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
Isso produz o seguinte esquema:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversão de DataFrame para XML
Ao gravar um DataFrame em XML, determinadas estruturas aninhadas exigem tratamento especial devido a diferenças entre os modelos de dados DataFrame e XML.
Se um DataFrame contiver um campo ArrayType cujo tipo de elemento também é ArrayType, gravá-lo em XML produzirá um nível extra de aninhamento que não está presente ao fazer a conversão de ida e volta de arquivos XML. Isso afeta apenas DataFrames originados fora do XML — ler e gravar arquivos XML preserva a estrutura original.
Por exemplo, um DataFrame com o seguinte esquema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
e os seguintes dados:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produz a seguinte saída XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
O nome do elemento da matriz sem nome em DataFrame é especificado pela opção arrayElementName (Padrão: item).
Habilitar a coluna de dados resgatados
A coluna de dados resgatada garante que você nunca perca dados durante o ETL. Ele captura todos os dados que não foram analisados porque um ou mais campos em um registro têm um dos seguintes problemas:
- Ausente do esquema fornecido.
- Não corresponde ao tipo de dados do esquema fornecido.
- Tem uma incompatibilidade de maiúsculas e minúsculas com os nomes de campo no esquema fornecido.
A coluna de dados resgatada é retornada como um documento JSON que contém as colunas que foram resgatadas e o caminho do arquivo de origem do registro.
Para habilitar a coluna de dados resgatada, defina a opção rescuedDataColumn como um nome de coluna ao ler:
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
Scala (linguagem de programação)
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load("/Volumes/<catalog>/<schema>/<volume>/reviews_xml")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_xml',
format => 'xml',
rowTag => 'review',
rescuedDataColumn => '_rescued_data'
)
Para remover o caminho do arquivo de origem da coluna de dados resgatada, defina:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
O analisador XML dá suporte para três modos ao analisar registros: PERMISSIVE, DROPMALFORMED e FAILFAST. Quando usado junto a rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam removidos no modo DROPMALFORMED ou geram um erro no modo FAILFAST. Somente registros corrompidos (XML incompleto ou malformado) são descartados ou geram erros.
Inferir e evoluir o esquema com o Carregador Automático
Para obter uma discussão detalhada sobre esse tópico e as opções aplicáveis, consulte Configurar a inferência e evolução do esquema no Carregador Automático. Você pode configurar o Carregador Automático para detectar automaticamente o esquema dos dados XML carregados, permitindo que você inicialize tabelas sem declarar explicitamente o esquema de dados e evolua o esquema da tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de controlar e aplicar manualmente as alterações de esquema ao longo do tempo.
Por padrão, a inferência de esquema do Carregador Automático busca evitar problemas de evolução do esquema devido a incompatibilidades de tipo. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Carregador Automático infere todas as colunas como cadeias de caracteres, incluindo campos aninhados em arquivos XML. O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes XML com base em dados de amostra. Para habilitar esse comportamento com o Carregador Automático, defina a opção cloudFiles.inferColumnTypes como true.
O Carregador Automático detecta a adição de novas colunas à medida que processa seus dados. Quando o Carregador Automático detecta uma nova coluna, o fluxo é interrompido com um UnknownFieldException. Antes de o fluxo gerar esse erro, o Carregador Automático executa a inferência de esquema no microlote de dados mais recente e atualiza o local do esquema com o esquema mais recente, mesclando as novas colunas ao final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. O carregador automático dá suporte para diferentes modos de evolução do esquema, que são definidos na opção cloudFiles.schemaEvolutionMode.
Você pode usar as dicas de esquema para impor as informações de esquema que você conhece e espera em um esquema inferido. Quando você sabe que uma coluna é de um tipo de dados específico ou se deseja escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro), é possível fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação do esquema SQL. Quando a coluna de dados resgatados está habilitada, os campos nomeados em um caso diferente daquele do esquema são carregados na coluna _rescued_data. Você pode alterar esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Carregador Automático faz a leitura dos dados sem distinção entre maiúsculas e minúsculas.
Usage
Os exemplos a seguir usam o conjunto de dados do Wanderbricks para demonstrar a leitura e gravação de arquivos XML usando a API e o SQL do DataFrame do Spark.
Ler e gravar XML
Use a API do DataFrame para gravar revisões do Wanderbricks no XML e lê-las novamente.
Python
# Write Wanderbricks reviews to XML
df = spark.read.table("samples.wanderbricks.reviews")
df.write \
.format("xml") \
.option("rootTag", "reviews") \
.option("rowTag", "review") \
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
# Read the XML file back
df_read = spark.read \
.format("xml") \
.option("rowTag", "review") \
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df_read.show()
Scala (linguagem de programação)
// Write Wanderbricks reviews to XML
val df = spark.read.table("samples.wanderbricks.reviews")
df.write
.format("xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.save("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
// Read the XML file back
val dfRead = spark.read
.format("xml")
.option("rowTag", "review")
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
dfRead.show()
R
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
Você pode especificar manualmente o esquema ao ler dados:
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.options(rowTag='review').xml('/Volumes/<catalog>/<schema>/<volume>/reviews.xml', schema=custom_schema)
df.show()
Scala (linguagem de programação)
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)))
val df = spark.read.option("rowTag", "review").schema(customSchema).xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.show()
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("rating", "integer"),
structField("comment", "string"))
df <- loadDF("/Volumes/<catalog>/<schema>/<volume>/reviews.xml", source = "xml", schema = customSchema, rowTag = "review")
saveDF(df, "/Volumes/<catalog>/<schema>/<volume>/newreviews.xml", "xml", "overwrite")
Ler e gravar XML com SQL
Use a DDL do SQL para criar uma tabela a partir de um arquivo XML. Azure Databricks infere tipos de coluna automaticamente.
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
SELECT * FROM reviews;
Você também pode especificar nomes e tipos de colunas na DDL. Nesse caso, o esquema não é inferido automaticamente.
DROP TABLE IF EXISTS reviews;
CREATE TABLE reviews (_id string, author string, rating integer, comment string)
USING XML
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews.xml", rowTag "review");
Carregar XML usando COPY INTO
Use COPY INTO para carregar arquivos XML do armazenamento em nuvem em uma tabela Delta.
DROP TABLE IF EXISTS reviews;
CREATE TABLE IF NOT EXISTS reviews;
COPY INTO reviews
FROM "/Volumes/<catalog>/<schema>/<volume>/reviews.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'review')
COPY_OPTIONS ('mergeSchema' = 'true');
Ler XML com validação por linha
Use a opção rowValidationXSDPath para validar cada linha em um esquema XSD durante a leitura.
Python
df = (spark.read
.format("xml")
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews.xml"))
df.printSchema()
Scala (linguagem de programação)
val df = spark.read
.option("rowTag", "review")
.option("rowValidationXSDPath", xsdPath)
.xml("/Volumes/<catalog>/<schema>/<volume>/reviews.xml")
df.printSchema
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews.xml',
format => 'xml',
rowTag => 'review',
rowValidationXSDPath => '/Volumes/<catalog>/<schema>/<volume>/reviews.xsd'
)
Carregar o XML com o Carregador Automático
Use o Carregador Automático para ingerir continuamente arquivos XML do armazenamento em nuvem em uma tabela Delta com inferência e evolução automáticas de esquema.
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("reviews")
)
Scala (linguagem de programação)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "review")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow())
.toTable("reviews")
Recursos adicionais
-
Ler e gravar dados XML usando a biblioteca
spark-xml: para usuários que já usaram a biblioteca XML do Spark de código aberto, consulte o guia de integração herdado. - Ler e gravar arquivos JSON: se os dados forem semi-estruturados, mas não XML, o JSON fornecerá inferência de esquema semelhante e suporte a dados aninhados com um formato mais simples.