Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Lo Structured Streaming è un motore di elaborazione dei flussi scalabile e tollerante ai guasti basato su Spark. Considera un flusso di dati live come una tabella a cui vengono accodate continuamente nuove righe. Structured Streaming supporta origini file predefinite, ad esempio CSV, JSON, ORC e Parquet, insieme a servizi di messaggistica come Kafka e Hub eventi di Azure.
Questo articolo illustra la configurazione di un'origine di streaming, ad esempio Hub eventi di Azure, l'inserimento di dati di streaming in una tabella Delta lakehouse, l'ottimizzazione delle prestazioni di scrittura con il partizionamento e l'invio in batch di eventi e l'esecuzione di processi di streaming in modo affidabile nell'ambiente di produzione.
Configurare un'origine di streaming
Per trasmettere i dati in un lakehouse, configurare innanzitutto una connessione alla tua sorgente di streaming. Hub eventi di Azure è una scelta comune. Usare il connettore hub eventi di Azure per Apache Spark per connettere l'applicazione Spark a Hub eventi di Azure.
Una configurazione di base di Hub eventi richiede il nome dello spazio dei nomi di Hub eventi, il nome dell'hub, il nome della chiave di accesso condiviso e il gruppo di consumer.
Un gruppo di consumatori è una visione complessiva di un hub eventi. I gruppi di consumer consentono a più applicazioni consumer di avere una visualizzazione separata del flusso di eventi e di leggere il flusso in modo indipendente, secondo il proprio ritmo e con i propri offset.
Le partizioni in Hub eventi consentono di elaborare volumi elevati di eventi in parallelo. Un singolo processore ha una capacità limitata per la gestione degli eventi al secondo, mentre più processori possono funzionare in parallelo tra partizioni.
Se vengono usate troppe partizioni con una frequenza di inserimento ridotta, i lettori di partizioni gestiscono una piccola parte di dati, causando un'elaborazione non ottimale. Il numero ideale di partizioni dipende dalla velocità di elaborazione desiderata. Quando si incrementa il numero di unità di throughput nel proprio spazio dei nomi, potrebbe essere utile avere partizioni aggiuntive per consentire ai lettori concorrenti di raggiungere il loro throughput massimo.
Verifica il miglior numero di partizioni per il tuo scenario di throughput. Gli scenari con velocità effettiva elevata usano in genere 32 o più partizioni.
Tabella Delta come destinazione di streaming
Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID (atomicità, coerenza, isolamento e durabilità) su Data Lake Storage. In Fabric Data Engineering, Delta Lake supporta upserts, compattazione dei dati, viaggio nel tempo, evoluzione dello schema e archiviazione in formato aperto.
Con delta come formato di output in writeStream, i dati di streaming vengono trasmessi direttamente in una tabella Delta. L'esempio seguente legge da Hub eventi, analizza il corpo del messaggio e scrive in una tabella Delta:
import pyspark.sql.functions as f
from pyspark.sql.types import *
df = (
spark.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
Schema = StructType([
StructField("<column_name_01>", StringType(), False),
StructField("<column_name_02>", StringType(), False),
StructField("<column_name_03>", DoubleType(), True),
StructField("<column_name_04>", LongType(), True),
StructField("<column_name_05>", LongType(), True),
])
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.toTable("deltaeventstable")
)
Nel codice imposta format("delta") Delta come formato di output, outputMode("append") scrive solo nuove righe nella tabella e toTable("deltaeventstable") salva in modo permanente i dati trasmessi in una tabella Delta gestita.
Ottimizzare le prestazioni di streaming
Una volta eseguita l'inserimento di streaming di base, è possibile migliorare la velocità effettiva e l'organizzazione dei file con le tecniche di ottimizzazione descritte nelle sezioni seguenti.
Dati di partizione per le scritture
Per ottimizzare la velocità effettiva, partizionare i dati in modo efficace. Il partizionamento migliora sia la velocità effettiva di scrittura che le prestazioni delle query downstream. È possibile partizionare i dati in memoria, su disco o su entrambi.
Su disco : usare partitionBy() per organizzare i dati in sottodirectory in base ai valori di colonna. Scegliere le colonne con cardinalità ottimale che producono file di dimensioni ottimali. Evitare colonne che creano troppe partizioni troppo piccole o troppo poche partizioni troppo grandi.
In memoria : usare repartition() o coalesce() per distribuire i dati tra i nodi di lavoro prima di scrivere:
-
repartition()aumenta o diminuisce le partizioni con uno shuffle completo, bilanciando i dati in modo uniforme. -
coalesce()riduce solo le partizioni, riducendo al minimo lo spostamento dei dati.
La combinazione di entrambi gli approcci è ideale per scenari con velocità effettiva elevata. L'esempio seguente suddivide i dati in 48 partizioni in memoria (corrispondenti ai core CPU disponibili) e quindi le partizioni su disco per due colonne:
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.repartition(48)
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.toTable("deltaeventstable")
)
Usare la scrittura ottimizzata
In alternativa al partizionamento manuale, la Scrittura Ottimizzata unisce o suddivide le partizioni prima di scrivere, massimizzando il throughput del disco senza chiamate manuali repartition() o coalesce(). Abilitalo usando una configurazione Spark:
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
Con Ottimizzazione scrittura abilitata, è possibile rimuovere repartition() o coalesce() dal codice e consentire a Spark di gestire il ridimensionamento delle partizioni. È comunque possibile usare partitionBy() per l'organizzazione a livello di disco.
Eventi batch con trigger
Per ottimizzare ulteriormente le prestazioni di scrittura, raggruppa gli eventi in batch prima di scriverli su disco. Per impostazione predefinita, Spark elabora ogni microbatch non appena viene completato quello precedente. L'impostazione di un intervallo di trigger accumula i dati in un periodo temporale e li scrive in operazioni minori ma più grandi. I batch più grandi producono file Delta più grandi e riducono il sovraccarico di file di piccole dimensioni.
L'esempio seguente elabora gli eventi in intervalli di un minuto:
rawData = (
df
.withColumn("bodyAsString", f.col("body").cast("string"))
.select(f.from_json("bodyAsString", Schema).alias("events"))
.select("events.*")
.writeStream
.format("delta")
.option("checkpointLocation", "Files/checkpoint")
.outputMode("append")
.partitionBy("<column_name_01>", "<column_name_02>")
.trigger(processingTime="1 minute")
.toTable("deltaeventstable")
)
Analizzare il volume dei dati in ingresso e scegliere un intervallo di elaborazione che produca file Parquet ben ridimensionati nella tabella Delta.
Eseguire processi di streaming nell'ambiente di produzione
I notebook Spark sono uno strumento efficace per lo sviluppo e il test della logica di streaming. Tuttavia, per i carichi di lavoro di produzione che devono essere eseguiti in modo continuo, usare invece le definizioni dei processi Spark. Le definizioni dei processi Spark sono attività non interattive orientate al codice eseguite in un cluster Spark e offrono maggiore affidabilità e disponibilità.
L'infrastruttura che esegue un processo di streaming può riscontrare problemi che arrestino il processo, ad esempio errori hardware o applicazione di patch all'infrastruttura. Un criterio di ripetizione dei tentativi riavvia automaticamente l'attività quando si arresta in modo imprevisto. Configurare i criteri di ripetizione dei tentativi in una definizione di processo Spark per specificare quante volte riavviare il processo (fino a tentativi infiniti) e l'intervallo di tempo tra i tentativi. Con un criterio di ripetizione dei tentativi abilitato, il processo di streaming continua l'esecuzione fino a quando non viene arrestato in modo esplicito.
L'hub di monitoraggio di Fabric include una scheda Structured Streaming con metriche, tra cui frequenza di input, frequenza dei processi, righe di input, durata batch e durata dell'operazione.