Freigeben über


Beispiele für Workflows in Lakeflow Spark Declarative Pipelines

Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Topics

Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen kafka_target erstellt, und es wird aus zwei Kafka-Topics in diese Streamingtabelle geschrieben.

Python

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Weitere Informationen zur read_kafka() in den SQL-Abfragen verwendeten Tabellenwertfunktion finden Sie unter read_kafka in der SQL-Sprachreferenz.

In Python können Sie programmgesteuert mehrere Flüsse erstellen, die auf eine einzelne Tabelle abzielen. Das folgende Beispiel zeigt dieses Muster für eine Liste der Kafka-Themen.

Hinweis

Dieses Muster hat die gleichen Anforderungen wie die Verwendung einer for Schleife zum Erstellen von Tabellen. Sie müssen einen Python-Wert explizit an die Funktion übergeben, die den Fluss definiert. Siehe Erstellen von Tabellen in einer for Schleife.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Beispiel: Ausführen eines einmaligen Datenrückfüllens

Wenn Sie eine Abfrage ausführen möchten, um Daten an eine vorhandene Streamingtabelle anzufügen, verwenden Sie append_flow.

Nach dem Anfügen einer Reihe vorhandener Daten haben Sie mehrere Optionen:

  • Wenn die Abfrage neue Daten anhängen soll, falls diese im Backfill-Verzeichnis eingehen, lassen Sie die Abfrage unverändert.
  • Wenn dies ein einmaliger Backfill sein soll und nie wieder ausgeführt werden soll, entfernen Sie die Abfrage, nachdem die Pipeline einmal ausgeführt wurde.
  • Wenn Sie möchten, dass die Abfrage einmal ausgeführt wird und nur dann erneut ausgeführt wird, wenn die Daten vollständig aktualisiert werden, legen Sie den once-Parameter auf True im Anfügefluss fest. Verwenden Sie INSERT INTO ONCEin SQL .

In den folgenden Beispielen wird eine Abfrage ausgeführt, um verlaufsgeschichtliche Daten an eine Streamingtabelle anzufügen:

Python

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Ein ausführlicheres Beispiel finden Sie unter "Zurückfüllen von historischen Daten mit Pipelines".

Beispiel: Verwenden Sie die Anfügeflussverarbeitung anstelle von UNION

Anstatt eine Abfrage mit einer UNION Klausel zu verwenden, können Sie Anfügeflussabfragen verwenden, um mehrere Quellen zu kombinieren und in eine einzelne Streamingtabelle zu schreiben. Verwenden Sie Append-Flow-Abfragen anstelle von UNION, können Sie Daten aus mehreren Quellen an eine Streamingtabelle hinzufügen, ohne eine vollständige Aktualisierung durchzuführen.

Das folgende Python-Beispiel enthält eine Abfrage, die mehrere Datenquellen mit einer UNION Klausel kombiniert:

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Die folgenden Beispiele ersetzen die UNION Abfrage durch Anfügeflussabfragen:

Python

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );

Beispiel: Zum transformWithState überwachen von Sensor-Heartbeats

Das Beispiel zeigt einen zustandsbehafteten Prozessor, der aus Kafka liest und überprüft, ob Sensoren regelmäßig Herzschläge abgeben. Wenn ein Heartbeat nicht innerhalb von 5 Minuten empfangen wird, schreibt der Prozessor einen Eintrag in die Ziel-Delta-Tabelle zur Analyse.

Weitere Informationen zum Erstellen benutzerdefinierter zustandsbehafteter Anwendungen finden Sie unter Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung.

Hinweis

RocksDB ist der Standardstatusanbieter ab Databricks Runtime 17.2. Wenn die Abfrage aufgrund einer Ausnahme eines nicht unterstützten Anbieters fehlschlägt, fügen Sie die folgenden Pipelinekonfigurationen hinzu, führen Sie eine vollständige Aktualisierungs- oder Prüfpunktzurücksetzung durch, und führen Sie dann die Pipeline erneut aus:

"configuration": {
    "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
    "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator

import pandas as pd

from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType

KAFKA_TOPIC = "<your-kafka-topic>"

output_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("last_heartbeat_time", TimestampType(), False)])

class SensorHeartbeatProcessor(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define state schema to store sensor information (sensor_id is the grouping key)
        state_schema = StructType([
            StructField("sensor_type", StringType(), False),
            StructField("last_heartbeat_time", TimestampType(), False)])
        self.sensor_state = handle.getValueState("sensorState", state_schema)
        # State variable to track the previously registered timer
        timer_schema = StructType([StructField("timer_ts", LongType(), False)])
        self.timer_state = handle.getValueState("timerState", timer_schema)
        self.handle = handle

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Process one row from input and update state
        pdf = next(rows)
        row = pdf.iloc[0]
        # Store or update the sensor information in state using current timestamp
        current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
        self.sensor_state.update((
            row["sensor_type"],
            current_time
        ))

        # Delete old timer if already registered
        if self.timer_state.exists():
            old_timer = self.timer_state.get()[0]
            self.handle.deleteTimer(old_timer)

        # Register a timer for 5 minutes from current processing time
        expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
        self.handle.registerTimer(expiry_time)
        # Store the new timer timestamp in state
        self.timer_state.update((expiry_time,))

        # No output on input processing, output only on timer expiry
        return iter([])

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        # Emit output row based on state store
        if self.sensor_state.exists():
            state = self.sensor_state.get()
            output = pd.DataFrame({
                "sensor_id": [key[0]],  # Use grouping key as sensor_id
                "sensor_type": [state[0]],
                "last_heartbeat_time": [state[1]]
            })
            # Remove the entry for the sensor from the state store
            self.sensor_state.clear()
            # Remove the timer state entry
            self.timer_state.clear()
            yield output

    def close(self) -> None:
        pass

dp.create_streaming_table("sensorAlerts")

# Define the schema for the Kafka message value
sensor_schema = StructType([
    StructField("sensor_id", LongType(), False),
    StructField("sensor_type", StringType(), False),
    StructField("sensor_value", LongType(), False)])

@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
    return (
      spark.readStream
        .format("kafka")
        .option("subscribe", KAFKA_TOPIC)
        .option("startingOffsets", "earliest")
        .load()
        .select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
        .select("data.*", "timestamp")
        .withWatermark('timestamp', '1 hour')
        .groupBy(col("sensor_id"))
        .transformWithStateInPandas(
          statefulProcessor = SensorHeartbeatProcessor(),
          outputStructType = output_schema,
          outputMode = 'update',
          timeMode = 'ProcessingTime'))