CSV (コンマ区切り値) は、データ交換、ETL パイプライン、汎用データ ストレージに広く使用されているプレーンテキストの表形式です。 Azure Databricksでは、スキーマ推論、圧縮、形式が正しくないレコード処理、復旧されたデータなど、Apache Spark での読み取りと書き込みの両方に CSV がサポートされています。
注
Databricks では、SQL ユーザーが CSV ファイルを読み取るための read_files テーブル値関数 が推奨されます。
read_files は、Databricks Runtime 13.3 LTS 以降で使用できます。
一時ビューを使うこともできます。 一時ビューまたは read_files を使わずに、SQL を使って CSV データを直接読み取る場合は、次の制限が適用されます。
- データ ソース オプションを指定することはできません。
- データ のスキーマを指定 することはできません。
前提条件
Azure Databricksでは、CSV ファイルを使用するために追加の構成は必要ありません。 ただし、CSV ファイルをストリーミングするには、 自動ローダーが必要です。
オプション
.option() と .options() の DataFrameReader メソッドおよび DataFrameWriter メソッドを使用して、CSV データ ソースを構成します。 サポートされているオプションの完全な一覧については、「 DataFrameReader CSV オプション と DataFrameWriter CSV オプション」を参照してください。
Usage
次の例は、CSV ファイルの読み取りと書き込み、スキーマの指定、および形式が正しくないレコードの処理を示しています。
CSV ファイルを読み取る
次の例では、 Wanderbricks サンプル データセットを使用します。 レビュー データを CSV に書き込み、読み戻します。
Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
Scala
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
R
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
SQL を使用して CSV ファイルを読み取る
次の SQL 例では、read_files を使って CSV ファイルを読み取ります。
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
スキーマを指定する
CSV ファイルのスキーマがわかっている場合は、schema オプションを使用して、目的のスキーマをCSV リーダーに指定できます。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
列のサブセットを読み取る
CSV パーサーの動作は、読み取る列によって異なります。 指定したスキーマがファイル レイアウトと一致しない場合、アクセスされる列によって結果が大きく異なる場合があります。 CSV には列名メタデータがないため、Spark はスキーマ フィールドを位置によって列にマップします。スキーマが一致しない場合は、値が間違ったフィールドにシフトされます。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
形式が正しくない CSV レコードを処理する
指定されたスキーマを使用して CSV ファイルを読み取るときに、ファイル内のデータがスキーマと一致しない可能性があります。 たとえば、市の名前を含むフィールドは、整数として解析されません。 結果は、パーサーが実行されるモードによって異なります。
-
PERMISSIVE(既定値): 正しく解析できなかったフィールドに対して null が挿入される。 -
DROPMALFORMED: 解析できなかったフィールドを含む行をドロップする。 -
FAILFAST: 形式が正しくないデータが検出された場合に、読み取りを中止する。
モードを設定するには mode オプションを使用します。
Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
Scala
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
PERMISSIVE モードでは、次のいずれかの方法を使用して、正しく解析できなかった行を検査できます。
- オプション
badRecordsPathにカスタム パスを指定して、破損したレコードをファイルに記録できます。 - 列
_corrupt_recordを DataFrameReader に指定されるスキーマに追加して、結果の DataFrame 内の破損したレコードを確認できます。
注
badRecordsPath オプションは _corrupt_record よりも優先されます。つまり、指定されたパスに書き込まれた形式に誤りがある行は、結果の DataFrame に表示されません。
形式に誤りがあるレコードに対する既定の動作は、復旧されたデータ列を使用する場合は変更されます。
_corrupt_recordを使用して形式が正しくない行を検査するには、スキーマに追加し、null 以外の値でフィルター処理します。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
復旧されたデータ列を有効にする
注
この機能は、Databricks Runtime 8.3 以降でサポートされています。
PERMISSIVE モードを使用する場合は、レコード内の1つ以上のフィールドに以下のいずれかの問題があるため、解析されなかったデータを捕捉するために、復旧データ列を有効にすることができます。
- 指定されたスキーマに存在しない。
- 指定されたスキーマのデータ型と一致しない。
- 指定されたスキーマのフィールド名と大文字と小文字の区別が一致しない。
復旧されたデータ列は、復旧された列と、レコードのソース ファイル パスを含む JSON ドキュメントとして返されます。
復旧されたデータ列を有効にするには、読み取り時に rescuedDataColumn オプションを列名に設定します。
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
Scala
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
復旧されたデータ列からソース ファイルのパスを削除するには、次の値を設定します。
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
CSV パーサーでは、レコードを解析するときに、PERMISSIVE、DROPMALFORMED、FAILFAST の 3 つのモードがサポートされます。
rescuedDataColumn と併用することで、データ型の不一致が発生しても、DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーが発生したりすることはありません。 削除されるかエラーが発生するのは、破損したレコード(不完全または形式が誤っているCSV)だけです。
rescuedDataColumn を PERMISSIVE モードで使用する場合、破損したレコードには次の規則が適用されます。
- ファイルの最初の行 (ヘッダー行またはデータ行) で、予想される行の長さが設定されます。
- 列数が異なる行は不完全と見なされます。
- データ型の不一致は、破損したレコードとは見なされません。
- 不完全で形式に誤りがある CSV レコードのみが破損したレコードと見なされ、
_corrupt_record列またはbadRecordsPathに記録されます。
その他のリソース
- Parquet ファイルの読み取りと書き込み: ワークロードでクエリパフォーマンスの向上や効率的なストレージが必要な場合、Parquet の列レイアウトは CSV のプレーンテキスト形式よりも大きな利点を提供します。