Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Beispiel: Schreiben in eine Streamingtabelle aus mehreren Kafka-Topics
Im folgenden Beispiel wird eine Streamingtabelle mit dem Namen kafka_target erstellt, und es wird aus zwei Kafka-Topics in diese Streamingtabelle geschrieben.
Python
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Weitere Informationen zur read_kafka() in den SQL-Abfragen verwendeten Tabellenwertfunktion finden Sie unter read_kafka in der SQL-Sprachreferenz.
In Python können Sie programmgesteuert mehrere Flüsse erstellen, die auf eine einzelne Tabelle abzielen. Das folgende Beispiel zeigt dieses Muster für eine Liste der Kafka-Themen.
Hinweis
Dieses Muster hat die gleichen Anforderungen wie die Verwendung einer for Schleife zum Erstellen von Tabellen. Sie müssen einen Python-Wert explizit an die Funktion übergeben, die den Fluss definiert. Siehe Erstellen von Tabellen in einer for Schleife.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Beispiel: Ausführen eines einmaligen Datenrückfüllens
Wenn Sie eine Abfrage ausführen möchten, um Daten an eine vorhandene Streamingtabelle anzufügen, verwenden Sie append_flow.
Nach dem Anfügen einer Reihe vorhandener Daten haben Sie mehrere Optionen:
- Wenn die Abfrage neue Daten anhängen soll, falls diese im Backfill-Verzeichnis eingehen, lassen Sie die Abfrage unverändert.
- Wenn dies ein einmaliger Backfill sein soll und nie wieder ausgeführt werden soll, entfernen Sie die Abfrage, nachdem die Pipeline einmal ausgeführt wurde.
- Wenn Sie möchten, dass die Abfrage einmal ausgeführt wird und nur dann erneut ausgeführt wird, wenn die Daten vollständig aktualisiert werden, legen Sie den
once-Parameter aufTrueim Anfügefluss fest. Verwenden SieINSERT INTO ONCEin SQL .
In den folgenden Beispielen wird eine Abfrage ausgeführt, um verlaufsgeschichtliche Daten an eine Streamingtabelle anzufügen:
Python
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Ein ausführlicheres Beispiel finden Sie unter "Zurückfüllen von historischen Daten mit Pipelines".
Beispiel: Verwenden Sie die Anfügeflussverarbeitung anstelle von UNION
Anstatt eine Abfrage mit einer UNION Klausel zu verwenden, können Sie Anfügeflussabfragen verwenden, um mehrere Quellen zu kombinieren und in eine einzelne Streamingtabelle zu schreiben. Verwenden Sie Append-Flow-Abfragen anstelle von UNION, können Sie Daten aus mehreren Quellen an eine Streamingtabelle hinzufügen, ohne eine vollständige Aktualisierung durchzuführen.
Das folgende Python-Beispiel enthält eine Abfrage, die mehrere Datenquellen mit einer UNION Klausel kombiniert:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Die folgenden Beispiele ersetzen die UNION Abfrage durch Anfügeflussabfragen:
Python
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);
Beispiel: Zum transformWithState überwachen von Sensor-Heartbeats
Das Beispiel zeigt einen zustandsbehafteten Prozessor, der aus Kafka liest und überprüft, ob Sensoren regelmäßig Herzschläge abgeben. Wenn ein Heartbeat nicht innerhalb von 5 Minuten empfangen wird, schreibt der Prozessor einen Eintrag in die Ziel-Delta-Tabelle zur Analyse.
Weitere Informationen zum Erstellen benutzerdefinierter zustandsbehafteter Anwendungen finden Sie unter Erstellen einer benutzerdefinierten zustandsbehafteten Anwendung.
Hinweis
RocksDB ist der Standardstatusanbieter ab Databricks Runtime 17.2. Wenn die Abfrage aufgrund einer Ausnahme eines nicht unterstützten Anbieters fehlschlägt, fügen Sie die folgenden Pipelinekonfigurationen hinzu, führen Sie eine vollständige Aktualisierungs- oder Prüfpunktzurücksetzung durch, und führen Sie dann die Pipeline erneut aus:
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider",
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled": "true"
}
from typing import Iterator
import pandas as pd
from pyspark import pipelines as dp
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
KAFKA_TOPIC = "<your-kafka-topic>"
output_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
class SensorHeartbeatProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define state schema to store sensor information (sensor_id is the grouping key)
state_schema = StructType([
StructField("sensor_type", StringType(), False),
StructField("last_heartbeat_time", TimestampType(), False)])
self.sensor_state = handle.getValueState("sensorState", state_schema)
# State variable to track the previously registered timer
timer_schema = StructType([StructField("timer_ts", LongType(), False)])
self.timer_state = handle.getValueState("timerState", timer_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Process one row from input and update state
pdf = next(rows)
row = pdf.iloc[0]
# Store or update the sensor information in state using current timestamp
current_time = pd.Timestamp(timerValues.getCurrentProcessingTimeInMs(), unit='ms')
self.sensor_state.update((
row["sensor_type"],
current_time
))
# Delete old timer if already registered
if self.timer_state.exists():
old_timer = self.timer_state.get()[0]
self.handle.deleteTimer(old_timer)
# Register a timer for 5 minutes from current processing time
expiry_time = timerValues.getCurrentProcessingTimeInMs() + (5 * 60 * 1000)
self.handle.registerTimer(expiry_time)
# Store the new timer timestamp in state
self.timer_state.update((expiry_time,))
# No output on input processing, output only on timer expiry
return iter([])
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Emit output row based on state store
if self.sensor_state.exists():
state = self.sensor_state.get()
output = pd.DataFrame({
"sensor_id": [key[0]], # Use grouping key as sensor_id
"sensor_type": [state[0]],
"last_heartbeat_time": [state[1]]
})
# Remove the entry for the sensor from the state store
self.sensor_state.clear()
# Remove the timer state entry
self.timer_state.clear()
yield output
def close(self) -> None:
pass
dp.create_streaming_table("sensorAlerts")
# Define the schema for the Kafka message value
sensor_schema = StructType([
StructField("sensor_id", LongType(), False),
StructField("sensor_type", StringType(), False),
StructField("sensor_value", LongType(), False)])
@dp.append_flow(target = "sensorAlerts")
def kafka_delta_flow():
return (
spark.readStream
.format("kafka")
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), sensor_schema).alias("data"), col("timestamp"))
.select("data.*", "timestamp")
.withWatermark('timestamp', '1 hour')
.groupBy(col("sensor_id"))
.transformWithStateInPandas(
statefulProcessor = SensorHeartbeatProcessor(),
outputStructType = output_schema,
outputMode = 'update',
timeMode = 'ProcessingTime'))