プロトコル バッファーの読み取りと書き込みを行う

プロトコル バッファー (protobuf) は、Google によって開発された言語に依存しないバイナリ シリアル化形式です。 Azure Databricks ユーザーがこの問題に最もよく遭遇するのは、Apache Kafka などのイベント ストリーミング システムからのバイナリ エンコードされたレコードを処理するときです。 Azure Databricksでは、from_protobuf関数とto_protobuf関数を使用して Apache Spark で protobuf データの読み取りと書き込みをサポートしています。この関数は、ストリーミング ワークロードとバッチ ワークロードの両方に対してバイナリの protobuf と Spark SQL 構造体の型を変換します。

前提条件

Protobuf 関数には、Databricks Runtime 12.2 LTS 以降が必要です。

関数の構文

from_protobufを使用してバイナリ列を構造体にキャストし、to_protobufを使用して構造体列をバイナリにキャストします。 descFilePath引数で識別される記述子ファイルか、options引数で指定されたスキーマ レジストリを指定する必要があります。 オプションの完全な一覧については、 Protobuf を参照してください。

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

スカラ (プログラミング言語)

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

オプション

options 引数を使用して、from_protobufto_protobuf にオプションを渡します。 サポートされているオプションの完全な一覧については、 Protobuf を参照してください。

スキーマ レジストリのオプション

次のオプションは、スキーマ レジストリの使用に固有のオプションであり、一般的なオプション リファレンスでは説明されていません。

オプション 必須 デフォルト Description
schema.registry.schema.evolution.mode いいえ "restart" 受信レコードで新しいスキーマ ID が検出された場合のスキーマ変更の処理方法。 "restart" は、 UnknownFieldExceptionを使用してクエリを終了します。変更を取得できない場合に再開するようにジョブを構成します。 "none" はスキーマ ID の変更を無視し、新しいレコードを元のスキーマで解析します。
confluent.schema.registry.<option> いいえ Confluent Schema Registry client の任意のオプションは、プレフィックス "confluent.schema.registry" を使用して指定できます。 たとえば、 "confluent.schema.registry.basic.auth.credentials.source""USER_INFO" に設定し、 "confluent.schema.registry.basic.auth.user.info""<KEY>:<SECRET>" に設定して基本認証を構成します。

Usage

次の例では 、Wanderbricks データセット を使用して、Apache Spark 構造体を to_protobuf() を使用してバイナリ protobuf にシリアル化し、 from_protobuf()でバイナリ protobuf レコードを逆シリアル化する方法を示します。

Confluent スキーマ レジストリで protobuf を使用する

Azure Databricks では、Confluen スキーマ レジストリを使用して Protobuf を定義できます。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)

スカラ (プログラミング言語)

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
        .as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
        .as("proto_event")
)
reviewsRestoredDF.show()

外部 Confluent スキーマ レジストリに対して認証する

外部 Confluent スキーマ レジストリに対して認証するには、スキーマ レジストリ オプションを更新して、認証資格情報と API キーを含めます。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

スカラ (プログラミング言語)

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity Catalog ボリュームでトラストストアおよびキーストア ファイルを使用する

Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリュームのトラストストアおよびキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の例に従って、スキーマ レジストリ オプションを更新します。

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

スカラ (プログラミング言語)

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

記述子ファイルで Protobuf を使用する

コンピューティング クラスターで使用できる protobuf 記述子ファイルを参照することもできます。 その場所に応じた、ファイルを読み取る適切なアクセス許可があることを確認してください。

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

descriptor_file = "/path/to/proto_descriptor.desc"

# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)

スカラ (プログラミング言語)

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct

val descriptorFile = "/path/to/proto_descriptor.desc"

// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()

その他のリソース