Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Questa funzionalità è disponibile in anteprima pubblica.
In Databricks Runtime 14.1 e versioni successive è possibile usare Structured Streaming per trasmettere dati da Apache Pulsar in Azure Databricks.
Structured Streaming offre una semantica di elaborazione exactly-once per i dati letti da sorgenti Pulsar.
Esempio di sintassi
Di seguito è riportato un esempio di base dell'uso di Structured Streaming per leggere da Pulsar:
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Per leggere dai topic di Pulsar, è necessario fornire un service.url e una delle seguenti opzioni:
topictopicstopicsPattern
Per un elenco completo delle opzioni, vedere Configurare le opzioni per la lettura in streaming Pulsar.
Eseguire l'autenticazione a Pulsar
Azure Databricks supporta l'autenticazione tramite truststore e keystore per Pulsar. Databricks consiglia di usare i segreti per archiviare i dettagli di configurazione.
Le opzioni di configurazione del flusso disponibili includono quanto segue:
pulsar.client.authPluginClassNamepulsar.client.authParamspulsar.client.useKeyStoreTlspulsar.client.tlsTrustStoreTypepulsar.client.tlsTrustStorePathpulsar.client.tlsTrustStorePassword
Se il flusso utilizza un PulsarAdmin, è necessario impostare le seguenti opzioni:
pulsar.admin.authPluginClassNamepulsar.admin.authParams
Example
L'esempio seguente illustra la configurazione delle opzioni di autenticazione:
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Schema Pulsar
Quando si legge da Pulsar, lo schema delle righe dipende dagli schemi degli argomenti dell'origine.
- Per gli argomenti con lo schema Avro o JSON, i nomi dei campi e i tipi di campo vengono mantenuti nel dataframe Spark risultante.
- Per gli argomenti senza schema o con un tipo di dati semplice in Pulsar, il payload viene caricato in una
valuecolonna. - Se si configura il flusso per leggere più argomenti con schemi diversi, impostare
allowDifferentTopicSchemasper caricare il contenuto non elaborato in unavaluecolonna.
I record Pulsar hanno i campi di metadati seguenti:
| colonna | Tipo |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configurare le opzioni per la lettura in streaming Pulsar
Configurare tutte le opzioni seguenti con la sintassi .option("<optionName>", "<optionValue>") per i flussi di lettura. È anche possibile configurare l'autenticazione usando .options(). Vedi Autenticarsi a Pulsar.
Nella tabella seguente vengono descritte le configurazioni necessarie per Pulsar. È necessario specificare solo una delle opzioni topico topicstopicsPattern.
| Opzione | Valore predefinito | Descrizione |
|---|---|---|
service.url |
Nessuno | La configurazione di Pulsar serviceUrl per il servizio Pulsar. |
topic |
Nessuno | Stringa del nome dell'argomento da utilizzare. |
topics |
Nessuno | Elenco delimitato da virgole degli argomenti da consumare. |
topicsPattern |
Nessuno | Una stringa regex Java per trovare corrispondenze con i topic da leggere. |
La tabella seguente descrive altre opzioni supportate per Pulsar:
| Opzione | Valore predefinito | Descrizione |
|---|---|---|
predefinedSubscription |
Nessuno | Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark. |
subscriptionPrefix |
Nessuno | Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark. |
pollTimeoutMs |
120000 | Tempo di attesa per la lettura dei messaggi provenienti da Pulsar, in millisecondi. |
waitingForNonExistedTopic |
false |
Indica se il connettore deve attendere fino a quando non vengono creati gli argomenti desiderati. |
failOnDataLoss |
true |
Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione). |
allowDifferentTopicSchemas |
false |
Se vengono letti più argomenti con schemi diversi, usare questa opzione per disattivare la deserializzazione automatica del valore dell'argomento basato su schema. Quando questo è true, vengono restituiti solo i valori grezzi. |
startingOffsets |
latest |
Se latest, il lettore legge i record più recenti dopo l'avvio. Se earliest, il lettore legge dall'offset più antico. È anche possibile specificare una stringa JSON per un offset specifico. |
maxBytesPerTrigger |
Nessuno | Limite flessibile per il numero massimo di byte da elaborare per micro batch. Se si specifica questa opzione, è necessario specificare anche admin.url. |
admin.url |
Nessuno | La configurazione di Pulsar serviceHttpUrl. Obbligatorio quando maxBytesPerTrigger viene specificato. |
È anche possibile specificare qualsiasi configurazione client, amministratore e lettore Pulsar usando i modelli seguenti:
| Modello | Opzioni di configurazione |
|---|---|
pulsar.client.* |
Configurazione del client Pulsar |
pulsar.admin.* |
Configurazione dell'amministratore pulsar |
pulsar.reader.* |
Configurazione del lettore Pulsar |
Crea offset iniziali in JSON
Per usare un ID messaggio personalizzato che specifica un offset, come JSON, con l'opzione startingOffsets , vedere l'esempio seguente:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()