Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
CSV (valores separados por vírgulas) é um formato tabular de texto sem formatação amplamente usado para troca de dados, pipelines DE ETL e armazenamento de dados de uso geral. Azure Databricks dá suporte ao CSV para leitura e gravação com o Apache Spark, incluindo inferência de esquema, compactação, manipulação de registros malformados e dados resgatados.
Observação
O Databricks recomenda a read_files função com valor de tabela para os usuários do SQL lerem arquivos CSV.
read_files está disponível no Databricks Runtime 13.3 LTS e superior.
Você também pode usar um modo de exibição temporário. Se você usar o SQL para ler dados CSV diretamente sem usar modos de exibição temporários ou read_files, as seguintes limitações se aplicam:
- Você não pode especificar opções de fonte de dados.
- Você não pode especificar o esquema para os dados.
Pré-requisitos
Azure Databricks não requer configuração adicional para usar arquivos CSV. No entanto, para transmitir arquivos CSV, você precisa do Carregador Automático.
Opções
Use os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar fontes de dados CSV. Para obter uma lista completa de opções com suporte, consulte DataFrameReader opções de CSV e DataFrameWriter opções de CSV.
Usage
Os exemplos a seguir demonstram a leitura e gravação de arquivos CSV, a especificação de esquemas e o tratamento de registros malformados.
Leitura de arquivos CSV
O exemplo a seguir usa o conjunto de dados de exemplo do Wanderbricks . Ele grava os dados de revisão no CSV e, em seguida, lê-os novamente.
Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
Scala
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
R
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
Ler arquivos CSV usando SQL
O exemplo de SQL a seguir lê um arquivo CSV usando read_files.
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Especificar um esquema
Quando o esquema do arquivo CSV é conhecido, você pode especificar o esquema desejado para o leitor CSV com a opção schema.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
Ler um subconjunto de colunas
O comportamento do analisador CSV depende de quais colunas são lidas. Se o esquema especificado não corresponder ao layout do arquivo, os resultados poderão diferir consideravelmente dependendo de quais colunas são acessadas. O CSV não tem metadados de nome de coluna, portanto, o Spark mapeia campos de esquema para colunas por posição – um esquema incompatível desloca valores para os campos errados.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
Manipular registros CSV malformados
Ao ler arquivos CSV com um esquema especificado, é possível que os dados nos arquivos não correspondam ao esquema. Por exemplo, um campo que contenha o nome da cidade não será analisado como inteiro. As consequências dependem do modo como o analisador é executado:
-
PERMISSIVE(padrão): nulos são inseridos para campos que não puderam ser analisados corretamente -
DROPMALFORMED: remove linhas que contêm campos que não puderam ser analisados -
FAILFAST: anula a leitura se algum dado malformado for encontrado
Para definir o modo, use a opção mode.
Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
Scala
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
No modo PERMISSIVE, é possível inspecionar as linhas que não puderam ser analisadas corretamente usando um dos seguintes métodos:
- Você pode fornecer um caminho personalizado para a opção
badRecordsPathde gravar registros corrompidos em um arquivo. - Você pode adicionar a coluna
_corrupt_recordao esquema fornecido ao DataFrameReader para examinar registros corrompidos no DataFrame resultante.
Observação
A opção badRecordsPath tem precedência sobre _corrupt_record, o que significa que linhas malformadas gravadas no caminho fornecido não aparecem no DataFrame resultante.
O comportamento padrão para registros malformados muda ao utilizar a coluna de dados resgatados.
Para inspecionar linhas malformadas usando _corrupt_record, adicione-as ao esquema e filtre em valores não nulos:
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
Habilitar a coluna de dados resgatados
Observação
Esse recurso tem suporte no Databricks Runtime 8.3 e superior.
Ao usar o PERMISSIVE modo, você pode habilitar a coluna de dados resgatada para capturar todos os dados que não foram analisados porque um ou mais campos em um registro têm um dos seguintes problemas:
- Ausente do esquema fornecido.
- Não corresponde ao tipo de dados do esquema fornecido.
- Tem uma incompatibilidade de maiúsculas e minúsculas com os nomes de campo no esquema fornecido.
A coluna de dados resgatada é retornada como um documento JSON que contém as colunas que foram resgatadas e o caminho do arquivo de origem do registro.
Para habilitar a coluna de dados resgatada, defina a opção rescuedDataColumn como um nome de coluna ao ler:
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
Scala
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
Para remover o caminho do arquivo de origem da coluna de dados resgatada, defina:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
O analisador CSV dá suporte a três modos ao analisar registros: PERMISSIVE, DROPMALFORMED e FAILFAST. Quando usado junto a rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registros sejam removidos no modo DROPMALFORMED ou geram um erro no modo FAILFAST. Somente registros corrompidos, como CSV incompletos ou malformados, são descartados ou geram erros.
Quando rescuedDataColumn é usado no modo PERMISSIVE, as seguintes regras se aplicam a registros corrompidos:
- A primeira linha do arquivo (uma linha de cabeçalho ou uma linha de dados) define o comprimento de linha esperado.
- Uma linha com um número diferente de colunas é considerada incompleta.
- As incompatibilidades de tipo de dados não são consideradas registros corrompidos.
- Somente registros CSV incompletos e malformados são considerados corrompidos e registrados na coluna
_corrupt_recordoubadRecordsPath.
Recursos adicionais
- Ler e gravar arquivos Parquet: se sua carga de trabalho exigir melhor desempenho de consulta ou armazenamento mais eficiente, o layout columnar do Parquet oferecerá vantagens significativas em relação ao formato de texto sem formatação do CSV.