Leia e escreva ficheiros CSV

CSV (valores separados por vírgulas) é um formato tabular em texto simples amplamente utilizado para troca de dados, pipelines ETL e armazenamento de dados de uso geral. O Azure Databricks suporta CSV tanto para leitura como para escrita com o Apache Spark, incluindo inferência de esquema, compressão, tratamento de registos malformados e dados resgatados.

Nota

O Databricks recomenda a read_files função com valor de tabela para que os usuários do SQL leiam arquivos CSV. read_files está disponível no Databricks Runtime 13.3 LTS e superior.

Você também pode usar um modo de exibição temporário. Se você usar SQL para ler dados CSV diretamente sem usar exibições temporárias ou read_files, as seguintes limitações se aplicam:

Pré-requisitos

O Azure Databricks não requer configuração adicional para utilizar ficheiros CSV. No entanto, para transmitir ficheiros CSV, precisas do Auto Loader.

Opções

Utilize os métodos .option() e .options() de DataFrameReader e DataFrameWriter para configurar fontes de dados CSV. Para uma lista completa de opções suportadas, consulte DataFrameReader opções CSV e DataFrameWriter opções CSV.

Usage

Os exemplos seguintes demonstram a leitura e escrita de ficheiros CSV, especificação de esquemas e gestão de registos malformados.

Ler ficheiros CSV

O exemplo seguinte utiliza o conjunto de dados de exemplo Wanderbricks . Escreve dados de avaliações no CSV e depois lê-os.

Python

# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

# Read the CSV file into a DataFrame
df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()

Scala

// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

// Read the CSV file into a DataFrame
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()

R

df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)

Leia ficheiros CSV usando SQL

O exemplo SQL a seguir lê um ficheiro CSV usando read_files.

-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
  'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Especifique um esquema

Quando o esquema do arquivo CSV é conhecido, você pode especificar o esquema desejado para o leitor CSV com a schema opção.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True),
  StructField("comment", StringType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true)
))

val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  schema => 'review_id string, rating int, comment string'
)

Leia um subconjunto de colunas

O comportamento do parser CSV depende das colunas que são lidas. Se o esquema especificado não corresponder ao layout do ficheiro, os resultados podem variar consideravelmente dependendo das colunas acedidas. O CSV não tem metadados de nomes de colunas, por isso o Spark mapeia os campos do esquema para colunas por posição — um esquema desalinhado desloca os valores para os campos errados.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Read only a subset of columns by specifying a partial schema
schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true)
))

val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  schema => 'review_id string, rating int'
)

Processar registos CSV malformados

Ao ler arquivos CSV com um esquema especificado, é possível que os dados nos arquivos não correspondam ao esquema. Por exemplo, um campo contendo o nome da cidade não será analisado como um inteiro. As consequências dependem do modo em que o analisador é executado:

  • PERMISSIVE (padrão): nulos são inseridos para campos que não puderam ser analisados corretamente
  • DROPMALFORMED: descarta linhas que contêm campos que não puderam ser analisados
  • FAILFAST: aborta a leitura se algum dado malformado for encontrado

Para definir o modo, use a mode opção.

Python

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)

Scala

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE'
)

No modo PERMISSIVE, é possível inspecionar as linhas que não puderam ser analisadas corretamente, usando um dos seguintes métodos:

  • Você pode fornecer um caminho personalizado para a opção badRecordsPath de gravar registros corrompidos em um arquivo.
  • Você pode adicionar a coluna _corrupt_record ao esquema fornecido ao DataFrameReader para revisar registros corrompidos no DataFrame resultante.

Nota

A badRecordsPath opção tem precedência sobre _corrupt_record, o que significa que as linhas malformadas gravadas no caminho fornecido não aparecem no DataFrame resultante.

O comportamento padrão para registros malformados muda ao usar a coluna de dados resgatados.

Para inspecionar linhas malformadas usando _corrupt_record, adicione-a ao esquema e filtre valores não nulos:

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True),
  StructField("comment", StringType(), True),
  StructField("_corrupt_record", StringType(), True)
])

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true),
  StructField("_corrupt_record", StringType, nullable = true)
))

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

df.filter(df("_corrupt_record").isNotNull).show()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL

Ativar a coluna de dados resgatados

Nota

Esta funcionalidade é suportada no Databricks Runtime 8.3 e superiores.

Ao usar o PERMISSIVE modo, você pode habilitar a coluna de dados resgatados para capturar quaisquer dados que não foram analisados porque um ou mais campos em um registro têm um dos seguintes problemas:

  • Ausente do esquema fornecido.
  • Não corresponde ao tipo de dados do esquema fornecido.
  • Há uma incompatibilidade de maiúsculas e minúsculas nos nomes de campo do esquema fornecido.

A coluna de dados resgatados é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro.

Para ativar a coluna de dados recuperados, defina a opção rescuedDataColumn com um nome de coluna durante a leitura:

Python

df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

Scala

val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  rescuedDataColumn => '_rescued_data'
)

Para remover o caminho do ficheiro de origem da coluna de dados resgatados, defina:

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

O analisador CSV suporta três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando usado em conjunto com o rescuedDataColumn, as incompatibilidades de tipo de dados não fazem com que os registos sejam descartados no modo DROPMALFORMED ou gerem um erro no modo FAILFAST. Somente registros corrompidos, ou seja, CSV incompletos ou malformados, são descartados ou geram erros.

Quando rescuedDataColumn usado no PERMISSIVE modo, as seguintes regras se aplicam a registros corrompidos:

  • A primeira linha do arquivo (uma linha de cabeçalho ou uma linha de dados) define o comprimento de linha esperado.
  • Uma linha com um número diferente de colunas é considerada incompleta.
  • Incompatibilidades de tipo de dados não são consideradas registros corrompidos.
  • Somente registros CSV incompletos e malformados são considerados corrompidos e registrados na _corrupt_record coluna ou badRecordsPath.

Recursos adicionais

  • Leia e escreva ficheiros Parquet: Se a sua carga de trabalho requer melhor desempenho de consulta ou armazenamento mais eficiente, o layout colunar do Parquet oferece vantagens significativas em relação ao formato de texto simples do CSV.