Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Usa l'API Lakeflow Spark Declarative Pipelines con flussi per scrivere record trasformati tramite una pipeline in un sink di dati esterno. I sink di dati esterni includono tabelle gestite ed esterne di Unity Catalog e servizi di streaming di eventi, ad esempio Apache Kafka o Hub eventi di Azure. È anche possibile utilizzare destinazioni di dati per scrivere su origini dati personalizzate scrivendo codice Python per queste ultime.
Per una panoramica dei concetti relativi ai sink e di quando utilizzarli, consulta Sinks in Lakeflow Spark Declarative Pipelines.
Annotazioni
- L'API
sinkè disponibile solo per Python. - È possibile creare un sink personalizzato con l'API ForEachBatch. Vedere Usare ForEachBatch per scrivere in sink di dati arbitrari nelle pipeline.
Flusso di lavoro di sink
Man mano che i dati degli eventi vengono inseriti da un'origine di streaming nella pipeline, elabori e perfezioni questi dati nelle trasformazioni nella pipeline. Si usa quindi l'elaborazione del flusso di accodamento per trasmettere i record di dati trasformati in un sink. Puoi creare questo sink utilizzando la funzione create_sink(). Per altri dettagli sulla funzione create_sink, consultare la documentazione API sink.
Se si dispone di una pipeline che crea o elabora i dati degli eventi di streaming e prepara i record di dati per la scrittura, è possibile usare un sink.
L'implementazione di un sink è costituita da due passaggi:
- Creare il sink.
- Usare un flusso di aggiunta o un flusso di aggiornamento per scrivere i record preparati nel sink di destinazione.
Creare un sink
Databricks supporta diversi tipi di sink di destinazione nei quali si scrivono i record elaborati dai dati del flusso:
- Tabelle Delta di destinazione (incluse le tabelle di Unity Catalog gestite ed esterne)
- Sink di Apache Kafka
- Sink di Hub eventi di Azure
- Sink personalizzati scritti in Python, utilizzando fonti dati personalizzate in Python
Di seguito sono riportati esempi di configurazioni per i sink Delta, Kafka e Event Hubs di Azure e origini dati personalizzate Python.
Lavandini Delta
Per creare un sink Delta in base al percorso del file:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Per creare un sink Delta in base al nome della tabella usando un catalogo completamente qualificato e un percorso dello schema qualificato:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Sink Kafka e Hub eventi di Azure
Questo codice funziona sia per i sink apache Kafka che per Hub eventi di Azure.
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
credential_name è un riferimento a una credenziale del servizio Catalogo Unity. Per altre informazioni, vedere Usare le credenziali del servizio Catalogo Unity per connettersi ai servizi cloud esterni.
Origini dati personalizzate Python
Supponendo di avere un'origine dati personalizzata Python registrata come my_custom_datasource, il codice seguente può scrivere in tale origine dati.
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create Lakeflow SDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
Per informazioni dettagliate sulla creazione di origini dati personalizzate in Python, vedere Origini dati personalizzate pySpark.
Per maggiori dettagli sull'uso della funzione create_sink, fare riferimento alla documentazione dell'API sink .
Dopo aver creato il sink, è possibile iniziare a trasmettere i record elaborati verso il sink.
Scrivere in un sink con un flusso di aggiunta
Dopo aver creato il sink, il passaggio successivo consiste nello scrivere record elaborati specificandolo come destinazione per i record prodotti da un flusso di aggiunta. Per fare ciò, specificare il sink come valore di target nella decorazione append_flow.
- Per le tabelle gestite ed esterne di Unity Catalog, usare il formato
deltae specificare il percorso o il nome della tabella nelle opzioni. La pipeline deve essere configurata per l'uso di Unity Catalog. - Per gli argomenti di Apache Kafka, usare il formato
kafkae specificare il nome dell'argomento, le informazioni di connessione e le informazioni di autenticazione nelle opzioni. Queste sono le stesse opzioni supportate da un Sink di Kafka in Spark Structured Streaming. Vedere Configurare il writer di streaming strutturato Kafka. - Per Hub eventi di Azure, usare il formato
kafkae specificare il nome, le informazioni di connessione e le informazioni di autenticazione di Hub eventi nelle opzioni. Queste sono le stesse opzioni supportate in un sink di Event Hubs Spark Structured Streaming che utilizza l'interfaccia Kafka. Vedere Autenticazione.
Di seguito sono riportati esempi di come configurare i flussi per la scrittura in sink Delta, Kafka e Hub eventi di Azure con record elaborati dalla pipeline.
Lavandino delta
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Sink Kafka e Hub eventi di Azure
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
Il parametro value è obbligatorio per un sink di Hub eventi di Azure. Altri parametri, ad esempio key, partition, headerse topic sono facoltativi.
Per ulteriori dettagli sul decoratore append_flow, vedi Flussi predefiniti e flussi di aggiunta.
Limitazioni
È supportata solo l'API Python. SQL non è supportato.
Sono supportate solo le query di streaming. Le query batch non sono supportate.
Solo
append_floweupdate_flowpossono essere usati per scrivere nei ricevitori. Altri flussi, ad esempiocreate_auto_cdc_flow, non sono supportati e non è possibile usare un sink in una definizione del set di dati della pipeline. Ad esempio, il codice seguente non è supportato:@table("from_sink_table") def fromSink(): return read_stream("my_sink")Per i Delta sink, il nome della tabella deve essere completamente qualificato. In particolare, per le tabelle esterne gestite dal catalogo Unity, il nome della tabella deve essere nel formato
<catalog>.<schema>.<table>. Per il metastore Hive, deve essere nel formato<schema>.<table>.L'esecuzione di un aggiornamento completo non elimina i dati dei risultati calcolati in precedenza nei sink. Ciò significa che tutti i dati elaborati vengono aggiunti al sink e i dati esistenti non vengono modificati.
Le aspettative della pipeline non sono supportate.
Il controllo uscita serverless supporta solo i connettori sink Kafka e Delta Lake. Vedere Che cos'è il controllo in uscita serverless?.