CSV ファイルの読み取りと書き込み

CSV (コンマ区切り値) は、データ交換、ETL パイプライン、汎用データ ストレージに広く使用されているプレーンテキストの表形式です。 Azure Databricksでは、スキーマ推論、圧縮、形式が正しくないレコード処理、復旧されたデータなど、Apache Spark での読み取りと書き込みの両方に CSV がサポートされています。

Databricks では、SQL ユーザーが CSV ファイルを読み取るための read_files テーブル値関数 が推奨されます。 read_files は、Databricks Runtime 13.3 LTS 以降で使用できます。

一時ビューを使うこともできます。 一時ビューまたは read_files を使わずに、SQL を使って CSV データを直接読み取る場合は、次の制限が適用されます。

前提条件

Azure Databricksでは、CSV ファイルを使用するために追加の構成は必要ありません。 ただし、CSV ファイルをストリーミングするには、 自動ローダーが必要です。

オプション

.option().options()DataFrameReader メソッドおよび DataFrameWriter メソッドを使用して、CSV データ ソースを構成します。 サポートされているオプションの完全な一覧については、「 DataFrameReader CSV オプションDataFrameWriter CSV オプション」を参照してください。

Usage

次の例は、CSV ファイルの読み取りと書き込み、スキーマの指定、および形式が正しくないレコードの処理を示しています。

CSV ファイルを読み取る

次の例では、 Wanderbricks サンプル データセットを使用します。 レビュー データを CSV に書き込み、読み戻します。

Python

# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

# Read the CSV file into a DataFrame
df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()

Scala

// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

// Read the CSV file into a DataFrame
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()

R

df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)

SQL を使用して CSV ファイルを読み取る

次の SQL 例では、read_files を使って CSV ファイルを読み取ります。

-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
  'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

スキーマを指定する

CSV ファイルのスキーマがわかっている場合は、schema オプションを使用して、目的のスキーマをCSV リーダーに指定できます。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True),
  StructField("comment", StringType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true)
))

val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  schema => 'review_id string, rating int, comment string'
)

列のサブセットを読み取る

CSV パーサーの動作は、読み取る列によって異なります。 指定したスキーマがファイル レイアウトと一致しない場合、アクセスされる列によって結果が大きく異なる場合があります。 CSV には列名メタデータがないため、Spark はスキーマ フィールドを位置によって列にマップします。スキーマが一致しない場合は、値が間違ったフィールドにシフトされます。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Read only a subset of columns by specifying a partial schema
schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True)
])

df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true)
))

val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  schema => 'review_id string, rating int'
)

形式が正しくない CSV レコードを処理する

指定されたスキーマを使用して CSV ファイルを読み取るときに、ファイル内のデータがスキーマと一致しない可能性があります。 たとえば、市の名前を含むフィールドは、整数として解析されません。 結果は、パーサーが実行されるモードによって異なります。

  • PERMISSIVE (既定値): 正しく解析できなかったフィールドに対して null が挿入される。
  • DROPMALFORMED: 解析できなかったフィールドを含む行をドロップする。
  • FAILFAST: 形式が正しくないデータが検出された場合に、読み取りを中止する。

モードを設定するには mode オプションを使用します。

Python

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)

Scala

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE'
)

PERMISSIVE モードでは、次のいずれかの方法を使用して、正しく解析できなかった行を検査できます。

  • オプション badRecordsPath にカスタム パスを指定して、破損したレコードをファイルに記録できます。
  • _corrupt_record を DataFrameReader に指定されるスキーマに追加して、結果の DataFrame 内の破損したレコードを確認できます。

badRecordsPath オプションは _corrupt_record よりも優先されます。つまり、指定されたパスに書き込まれた形式に誤りがある行は、結果の DataFrame に表示されません。

形式に誤りがあるレコードに対する既定の動作は、復旧されたデータ列を使用する場合は変更されます。

_corrupt_recordを使用して形式が正しくない行を検査するには、スキーマに追加し、null 以外の値でフィルター処理します。

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
  StructField("review_id", StringType(), True),
  StructField("rating", IntegerType(), True),
  StructField("comment", StringType(), True),
  StructField("_corrupt_record", StringType(), True)
])

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("review_id", StringType, nullable = true),
  StructField("rating", IntegerType, nullable = true),
  StructField("comment", StringType, nullable = true),
  StructField("_corrupt_record", StringType, nullable = true)
))

val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("mode", "PERMISSIVE")
  .schema(schema)
  .load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

df.filter(df("_corrupt_record").isNotNull).show()

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  mode => 'PERMISSIVE',
  schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL

復旧されたデータ列を有効にする

この機能は、Databricks Runtime 8.3 以降でサポートされています。

PERMISSIVE モードを使用する場合は、レコード内の1つ以上のフィールドに以下のいずれかの問題があるため、解析されなかったデータを捕捉するために、復旧データ列を有効にすることができます。

  • 指定されたスキーマに存在しない。
  • 指定されたスキーマのデータ型と一致しない。
  • 指定されたスキーマのフィールド名と大文字と小文字の区別が一致しない。

復旧されたデータ列は、復旧された列と、レコードのソース ファイル パスを含む JSON ドキュメントとして返されます。

復旧されたデータ列を有効にするには、読み取り時に rescuedDataColumn オプションを列名に設定します。

Python

df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

Scala

val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")

SQL

SELECT * FROM read_files(
  '/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
  format => 'csv',
  header => true,
  rescuedDataColumn => '_rescued_data'
)

復旧されたデータ列からソース ファイルのパスを削除するには、次の値を設定します。

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

CSV パーサーでは、レコードを解析するときに、PERMISSIVEDROPMALFORMEDFAILFAST の 3 つのモードがサポートされます。 rescuedDataColumn と併用することで、データ型の不一致が発生しても、DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーが発生したりすることはありません。 削除されるかエラーが発生するのは、破損したレコード(不完全または形式が誤っているCSV)だけです。

rescuedDataColumnPERMISSIVE モードで使用する場合、破損したレコードには次の規則が適用されます。

  • ファイルの最初の行 (ヘッダー行またはデータ行) で、予想される行の長さが設定されます。
  • 列数が異なる行は不完全と見なされます。
  • データ型の不一致は、破損したレコードとは見なされません。
  • 不完全で形式に誤りがある CSV レコードのみが破損したレコードと見なされ、_corrupt_record 列または badRecordsPath に記録されます。

その他のリソース

  • Parquet ファイルの読み取りと書き込み: ワークロードでクエリパフォーマンスの向上や効率的なストレージが必要な場合、Parquet の列レイアウトは CSV のプレーンテキスト形式よりも大きな利点を提供します。