Streaming da Apache Pulsar

Importante

Questa funzionalità è disponibile in anteprima pubblica.

In Databricks Runtime 14.1 e versioni successive è possibile usare Structured Streaming per trasmettere dati da Apache Pulsar in Azure Databricks.

Structured Streaming offre una semantica di elaborazione exactly-once per i dati letti da sorgenti Pulsar.

Esempio di sintassi

Di seguito è riportato un esempio di base dell'uso di Structured Streaming per leggere da Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Per leggere dai topic di Pulsar, è necessario fornire un service.url e una delle seguenti opzioni:

  • topic
  • topics
  • topicsPattern

Per un elenco completo delle opzioni, vedere Configurare le opzioni per la lettura in streaming Pulsar.

Eseguire l'autenticazione a Pulsar

Azure Databricks supporta l'autenticazione tramite truststore e keystore per Pulsar. Databricks consiglia di usare i segreti per archiviare i dettagli di configurazione.

Le opzioni di configurazione del flusso disponibili includono quanto segue:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se il flusso utilizza un PulsarAdmin, è necessario impostare le seguenti opzioni:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Example

L'esempio seguente illustra la configurazione delle opzioni di autenticazione:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Schema Pulsar

Quando si legge da Pulsar, lo schema delle righe dipende dagli schemi degli argomenti dell'origine.

  • Per gli argomenti con lo schema Avro o JSON, i nomi dei campi e i tipi di campo vengono mantenuti nel dataframe Spark risultante.
  • Per gli argomenti senza schema o con un tipo di dati semplice in Pulsar, il payload viene caricato in una value colonna.
  • Se si configura il flusso per leggere più argomenti con schemi diversi, impostare allowDifferentTopicSchemas per caricare il contenuto non elaborato in una value colonna.

I record Pulsar hanno i campi di metadati seguenti:

colonna Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurare le opzioni per la lettura in streaming Pulsar

Configurare tutte le opzioni seguenti con la sintassi .option("<optionName>", "<optionValue>") per i flussi di lettura. È anche possibile configurare l'autenticazione usando .options(). Vedi Autenticarsi a Pulsar.

Nella tabella seguente vengono descritte le configurazioni necessarie per Pulsar. È necessario specificare solo una delle opzioni topico topicstopicsPattern.

Opzione Valore predefinito Descrizione
service.url Nessuno La configurazione di Pulsar serviceUrl per il servizio Pulsar.
topic Nessuno Stringa del nome dell'argomento da utilizzare.
topics Nessuno Elenco delimitato da virgole degli argomenti da consumare.
topicsPattern Nessuno Una stringa regex Java per trovare corrispondenze con i topic da leggere.

La tabella seguente descrive altre opzioni supportate per Pulsar:

Opzione Valore predefinito Descrizione
predefinedSubscription Nessuno Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark.
subscriptionPrefix Nessuno Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark.
pollTimeoutMs 120000 Tempo di attesa per la lettura dei messaggi provenienti da Pulsar, in millisecondi.
waitingForNonExistedTopic false Indica se il connettore deve attendere fino a quando non vengono creati gli argomenti desiderati.
failOnDataLoss true Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione).
allowDifferentTopicSchemas false Se vengono letti più argomenti con schemi diversi, usare questa opzione per disattivare la deserializzazione automatica del valore dell'argomento basato su schema. Quando questo è true, vengono restituiti solo i valori grezzi.
startingOffsets latest Se latest, il lettore legge i record più recenti dopo l'avvio. Se earliest, il lettore legge dall'offset più antico. È anche possibile specificare una stringa JSON per un offset specifico.
maxBytesPerTrigger Nessuno Limite flessibile per il numero massimo di byte da elaborare per micro batch. Se si specifica questa opzione, è necessario specificare anche admin.url.
admin.url Nessuno La configurazione di Pulsar serviceHttpUrl. Obbligatorio quando maxBytesPerTrigger viene specificato.

È anche possibile specificare qualsiasi configurazione client, amministratore e lettore Pulsar usando i modelli seguenti:

Modello Opzioni di configurazione
pulsar.client.* Configurazione del client Pulsar
pulsar.admin.* Configurazione dell'amministratore pulsar
pulsar.reader.* Configurazione del lettore Pulsar

Crea offset iniziali in JSON

Per usare un ID messaggio personalizzato che specifica un offset, come JSON, con l'opzione startingOffsets , vedere l'esempio seguente:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()