Freigeben über


Anwenden von Wasserzeichen zum Steuern von Datenverarbeitungsschwellenwerten

Auf dieser Seite werden die grundlegenden Konzepte der Wasserzeichenerstellung beschrieben und Empfehlungen für die Verwendung von Wasserzeichen in gängigen zustandsbehafteten Streamingvorgängen bereitgestellt. Sie müssen Wasserzeichen auf zustandsbehaftete Streamingvorgänge anwenden, um eine unendliche Erweiterung der im Zustand gespeicherten Datenmenge zu vermeiden, wodurch Speicherprobleme auftreten oder die Verarbeitungslatenz während lang andauernder Streamingvorgänge erhöht werden kann.

Was ist ein Wasserzeichen?

Strukturiertes Streaming verwendet Wasserzeichen, um den Schwellenwert für die Dauer der Verarbeitung von Updates für eine bestimmte Zustandsentität zu steuern. Zu den häufigen Beispielen von staatlichen Einrichtungen gehören:

  • Aggregationen über ein Zeitfenster.
  • Eindeutige Schlüssel in einer Verknüpfung zwischen zwei Streams.

Wenn Sie ein Wasserzeichen deklarieren, geben Sie ein Zeitstempelfeld und einen Wasserzeichenschwellenwert für einen Streaming-DataFrame an. Wenn neue Daten eintreffen, verfolgt der Zustandsmanager den letzten Zeitstempel im angegebenen Feld und verarbeitet alle Datensätze innerhalb des Verspätungsschwellenwerts.

Im folgenden Beispiel wird ein Wasserzeichenschwellenwert von 10 Minuten auf eine fensterübergreifende Zählung angewendet.

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In diesem Beispiel:

  • Die Spalte event_time wird verwendet, um ein Wasserzeichen von 10 Minuten und ein rollierendes Fenster von 5 Minuten zu definieren.
  • Für jedes beobachtete id wird eine Zählung in jedem nicht überlappenden 5-Minuten-Fenster erfasst.
  • Zustandsdaten werden für jede Zählung beibehalten, bis das Ende des Fensters um 10 Minuten älter ist als das zuletzt beobachtete event_time.

Wichtig

Wasserzeichen-Schwellenwerte garantieren, dass Datensätze, die innerhalb des angegebenen Schwellenwerts eintreffen, gemäß der Semantik der definierten Abfrage verarbeitet werden. Spät ankommende Datensätze, die außerhalb des angegebenen Schwellenwerts ankommen, werden möglicherweise weiterhin mithilfe von Abfragemetriken verarbeitet, dies ist jedoch nicht garantiert.

Wie wirken sich Wasserzeichen auf die Verarbeitungszeit und den Durchsatz aus?

Wasserzeichen interagieren mit Ausgabemodi, um zu steuern, wann Daten in die Senke geschrieben werden. Da Wasserzeichen die Gesamtmenge der zu verarbeitenden Zustandsinformationen reduzieren, ist eine effektive Verwendung von Wasserzeichen für einen effizienten zustandsbehafteten Streamingdurchsatz unerlässlich.

Hinweis

Nicht alle Ausgabemodi werden für alle zustandsbehafteten Operationen unterstützt.

Wasserzeichen und Ausgabemodus für Fensteraggregationen

In der folgenden Tabelle wird die Verarbeitung von Abfragen mit Aggregation auf einem Zeitstempel mit einem definierten Wasserzeichen beschrieben:

Ausgabemodus Verhalten
Anfügen Zeilen werden in die Zieltabelle geschrieben, nachdem der Wasserzeichenschwellenwert überschritten wurde. Alle Schreibvorgänge werden basierend auf dem Schwellenwert für Verspätungen verzögert. Der alte Aggregationszustand wird gelöscht, nachdem der Schwellenwert überschritten wurde.
Aktualisieren Zeilen werden beim Berechnen der Ergebnisse in die Zieltabelle geschrieben und können aktualisiert und überschrieben werden, wenn neue Daten eintreffen. Der alte Aggregationszustand wird gelöscht, nachdem der Schwellenwert überschritten wurde.
Abgeschlossen Der Aggregationsstatus wird nicht gelöscht. Die Zieltabelle wird mit jedem Trigger neu geschrieben.

Wasserzeichen und Ausgabe für Datenstromverknüpfungen

Verknüpfungen zwischen mehreren Datenströmen unterstützen nur den Anfügemodus, und übereinstimmende Datensätze werden in jeden Batch geschrieben, in dem sie ermittelt wurden. Für innere Verknüpfungen empfiehlt Databricks, einen Wasserzeichenschwellenwert für jede Streamingdatenquelle festzulegen. Dadurch können Zustandsinformationen für alte Datensätze verworfen werden. Ohne Wasserzeichen versucht strukturiertes Streaming, jeden Schlüssel von beiden Seiten der Verknüpfung mit jedem Trigger zu verknüpfen.

Strukturiertes Streaming verfügt über eine spezielle Semantik, um äußere Verknüpfungen zu unterstützen. Wasserzeichen sind für äußere Verknüpfungen obligatorisch. Sie geben an, wann ein Schlüssel mit einem NULL-Wert geschrieben werden muss, nachdem keine Übereinstimmungen dafür gefunden wurden. Während äußere Verknüpfungen nützlich sein können, um Datensätze aufzuzeichnen, die während der Datenverarbeitung nie übereinstimmen, da Verknüpfungen nur als Anfügevorgänge in Tabellen geschrieben werden, werden diese fehlenden Daten erst aufgezeichnet, nachdem der Schwellenwert für späte Daten überschritten wurde.

Steuern des Schwellenwerts für verspätete Daten mit mehreren Wasserzeichenrichtlinien beim strukturierten Streaming

Wenn Sie mit mehreren strukturierten Streaming-Eingaben arbeiten, können Sie mehrere Grenzwerte festlegen, um Toleranzschwellenwerte für spät ankommende Daten zu steuern. Durch die Konfiguration von Wasserzeichen können Sie Zustandsinformationen steuern und die Latenz beeinflussen.

Eine Streamingabfrage kann mehrere Eingabestreams aufweisen, die vereinigt oder miteinander verbunden werden. Jeder Eingabestream kann über einen anderen Schwellenwert für verspätete Daten verfügen, der für zustandsbehaftete Vorgänge toleriert werden muss. Geben Sie diese Schwellenwerte mit withWatermarks("eventTime", delay) für jeden der Eingabestreams an. Im Folgenden ist eine Abfrage als Beispiel mit Stream-Stream-Joins dargestellt.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Beim Ausführen der Abfrage führt das strukturierte Streaming die maximale Ereigniszeit jedes Eingabestreams einzeln nach, berechnet Grenzwerte basierend auf der entsprechenden Verzögerung und wählt einen einzelnen globalen Grenzwert aus, das für zustandsbehaftete Vorgänge verwendet werden soll. Standardmäßig wird das Minimum als globales Wasserzeichen ausgewählt, da dadurch verhindert wird, dass Daten versehentlich als zu spät gelöscht werden, wenn einer der Datenströme hinter die anderen fällt (z. B. einer der Datenströme beendet den Empfang von Daten aufgrund von Upstreamfehlern). Anders ausgedrückt: Der globale Grenzwert bewegt sich sicher mit der Geschwindigkeit des langsamsten Streams, und die Abfrageausgabe verzögert sich entsprechend.

Wenn Sie schnellere Ergebnisse erzielen möchten, können Sie die Richtlinie für mehrere Grenzwerte so festlegen, dass der Höchstwert als globaler Grenzwert ausgewählt wird, indem Sie die SQL-Konfiguration spark.sql.streaming.multipleWatermarkPolicy auf max setzen (Standardeinstellung ist min). Dadurch kann sich das globale Wasserzeichen mit der Geschwindigkeit des schnellsten Streams bewegen. Diese Konfiguration verwirft jedoch Daten aus den langsamsten Datenströmen. Databricks empfiehlt, diese Konfiguration sorgfältig zu verwenden.

Anwenden von Wasserzeichen auf unterschiedliche Vorgänge

Der distinct-Vorgang ist ein zustandsbehafteter Operator, der Wasserzeichen erfordert, um ein unbegrenztes Anwachsen des Zustands zu verhindern. Ohne Wasserzeichen versucht Structured Streaming, jeden eindeutigen Datensatz unbegrenzt nachzuverfolgen, was zu Speicherproblemen oder erhöhten Verarbeitungslatenz führen kann.

Wenn Sie distinct auf einen Streaming-Datenrahmen anwenden, müssen Sie ein Wasserzeichen für ein Zeitstempel-Feld angeben. Das Wasserzeichen steuert, wie lange der Zustandsmanager Datensätze für die Deduplizierung verwaltet. Nachdem der Wasserzeichenschwellenwert überschritten wurde, werden alte Datensätze aus dem Speicher entfernt.

Im folgenden Beispiel wird ein Wasserzeichen auf einen distinct Vorgang angewendet:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

In diesem Beispiel werden doppelte Datensätze, die innerhalb von 1 Stunde nach der zuletzt beobachteten eventTime ankommen, aus dem Datenstrom entfernt. Statusinformationen für die Deduplizierung werden nach ablaufen des Schwellenwerts verworfen.

Wichtig

Wenn Sie bestimmte Spalten anstelle aller Spalten deduplizieren müssen, verwenden dropDuplicates() Oder dropDuplicatesWithinWatermark() anstelle von distinct. Weitere Informationen finden Sie im nächsten Abschnitt.

Duplikate innerhalb des Wasserzeichens löschen

In Databricks Runtime 13.3 LTS oder höher können Sie Datensätze innerhalb eines Wasserzeichenschwellenwerts mithilfe eines eindeutigen Bezeichners deduplizieren.

Strukturiertes Streaming bietet die Garantie der einmaligen Verarbeitung, dupliziert jedoch keine Datensätze aus Datenquellen automatisch. Sie können dropDuplicatesWithinWatermark verwenden, um Datensätze für jedes angegebene Feld zu deduplizieren, sodass Sie Duplikate aus einem Stream entfernen können, auch wenn sich einige Felder unterscheiden (z. B. Ereigniszeit oder Ankunftszeit).

Doppelte Datensätze, die innerhalb des angegebenen Wasserzeichens eintreffen, werden garantiert gelöscht. Diese Garantie gilt nur in eine Richtung. Doppelte Datensätze, die außerhalb des festgelegten Schwellenwerts eintreffen, werden möglicherweise ebenfalls gelöscht. Sie müssen den Schwellenwert für die Verzögerung des Wasserzeichens länger als die maximale Zeitstempeldifferenz zwischen den duplizierten Ereignissen einstellen, um alle Duplikate zu entfernen.

Sie müssen ein Wasserzeichen angeben, um die dropDuplicatesWithinWatermark-Methode zu verwenden, wie im folgenden Beispiel:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))