Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auf dieser Seite werden zustandsbehaftete strukturierte Streamingabfragen erläutert, einschließlich zustandsbehafteter Vorgänge, Optimierungsempfehlungen, Verkettung mehrerer zustandsbehafteter Operatoren und Zustands-Rebalancing.
Eine zustandsbehaftete strukturierte Streaming-Abfrage erfordert inkrementelle Aktualisierungen von Zwischenzustandsinformationen, während eine zustandslose strukturierte Streaming-Abfrage nur Informationen darüber verfolgt, welche Datensätze von der Datenquelle zur Senke verarbeitet wurden. Optimierungsfunktionen, die für zustandslose Abfragen verfügbar sind, finden Sie unter Optimierung zustandsloser Streaming-Abfragen.
Zustandsbehaftete Vorgänge
Zustandsbehaftete Vorgänge umfassen Streaming-Aggregation, distinct, dropDuplicates Stream-Stream-Verknüpfungen und benutzerdefinierte zustandsbehaftete Anwendungen.
Die für zustandsbehafteten Strukturierten Streaming-Abfragen erforderlichen Zwischenstatusinformationen können zu unerwarteten Latenz- und Produktionsproblemen führen, wenn sie falsch konfiguriert sind.
In Databricks Runtime 13.3 LTS oder höher können Sie die Änderungsprotokollierung mit RocksDB aktivieren, um die Prüfpunkterstellungsdauer und die End-to-End-Latenz für Structured Streaming-Workloads zu senken. Databricks empfiehlt, Änderungsprotokollprüfpunkte für alle zustandsbehafteten Abfragen von strukturiertem Streaming zu aktivieren. Weitere Informationen unter Aktivieren der Änderungsprotokollprüfpunkte.
Optimierung von stateful Structured Streaming-Abfragen
Databricks empfiehlt Folgendes für zustandsbehaftete Strukturierte Streaming-Abfragen:
- Verwenden Sie compute-optimierte Instanzen als Worker.
- Legen Sie die Anzahl der Shuffle-Partitionen auf 1-2 mal die Anzahl der Kerne im Cluster fest.
- Legen Sie die
spark.sql.streaming.noDataMicroBatches.enabled-Konfiguration in der SparkSession auffalsefest. Dadurch wird verhindert, dass das Streaming-Mikrobatchmodul Mikrobatches verarbeitet, die keine Daten enthalten. Wenn Sie diese Konfiguration auffalsefestlegen, kann dies auch dazu führen, dass zustandsbehaftete Vorgänge, die Wasserzeichen oder Verarbeitungszeittimeouts verwenden, erst dann Datenausgaben erhalten, wenn neue Daten eingehen, anstatt sofort.
Databricks empfiehlt die Verwendung von RocksDB mit Changelog-Prüfpunkten, um den Zustand für zustandsbehaftete Datenströme zu verwalten. Weitere Informationen finden Sie unter Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks.
Hinweis
Das Zustandsverwaltungsschema kann bei Neustarts von Abfragen nicht geändert werden. Wenn eine Abfrage mit der Standardverwaltung gestartet wurde, müssen Sie sie von Grund auf neu mit einem neuen Prüfpunktspeicher neu starten, um den Zustandsspeicher zu ändern.
Arbeiten mit mehreren zustandsbehafteten Operatoren im strukturierten Streaming
In Databricks Runtime 13.3 LTS oder höher bietet Azure Databricks erweiterte Unterstützung für zustandsbehaftete Operatoren in strukturierten Streaming-Workloads. Sie können mehrere zustandsbehaftete Operatoren miteinander verketten, was bedeutet, dass Sie die Ausgabe eines Vorgangs, z. B. eine Fensteraggregation, in einen anderen zustandsbehafteten Vorgang, z. B. eine Verknüpfung, übertragen können.
In Databricks Runtime 16.2 oder höher können Sie in Workloads mit mehreren zustandsbehafteten Operatoren verwenden transformWithState . Siehe Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung.
Die folgenden Beispiele veranschaulichen mehrere Muster, die Sie verwenden können.
Wichtig
Beim Arbeiten mit mehreren zustandsbehafteten Operatoren bestehen die folgenden Einschränkungen:
- Veraltete benutzerdefinierte zustandsbehaftete Operatoren (
FlatMapGroupWithStateundapplyInPandasWithState) werden nicht unterstützt. - Nur der Modus „Ausgabe anfügen“ wird unterstützt.
Aggregation verketteter Zeitfenster
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Aggregation von Zeitfenstern in zwei verschiedenen Streams, gefolgt von Stream-Stream-Fenster-Verknüpfung
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Stream-Stream-Zeitintervallverknüpfung gefolgt von Zeitfensteraggregation
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Statusausgleich für strukturiertes Streaming
Die Zustandsneuausgleich ist standardmäßig für alle Streaming-Arbeitslasten in Lakeflow Spark Declarative Pipelines aktiviert. In Databricks Runtime 11.3 LTS oder höher können Sie die folgende Konfigurationsoption in der Spark-Clusterkonfiguration festlegen, um die Zustandsrebalancing zu aktivieren:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Von der Zustandsneuordnung profitieren zustandsbehaftete Pipelines für strukturiertes Streaming, die Clustergrößenänderungen durchlaufen. Zustandslose Streamingvorgänge profitieren nicht, unabhängig von der Änderung der Clustergrößen.
Hinweis
Die automatische Computeskalierung hat Einschränkungen beim Verkleinern der Clustergröße für strukturierte Streaming-Workloads. Databricks empfiehlt die Verwendung von Lakeflow Spark Declarative Pipelines mit verbesserter automatischer Skalierung für Streaming-Workloads. Siehe Optimieren der Clusternutzung von Lakeflow Spark Declarative Pipelines mit automatischer Skalierung.
Cluster-Resizing-Ereignisse lösen die Rebalancierung des Zustands aus. Mikrobatches können bei rebalancing-Ereignissen eine höhere Latenz haben, da der Zustand vom Cloudspeicher an die neuen Executoren geladen wird.