Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Important
This feature is in Public Preview.
Real-time mode is a trigger type for Structured Streaming that enables ultra-low latency data processing with end-to-end latency as low as five milliseconds. Use real-time mode for operational workloads that require immediate response to streaming data, such as fraud detection, real-time personalization, and instant decision-making systems.
Real-time mode is available in Databricks Runtime 16.4 LTS and above. For step-by-step setup instructions, see Get started with real-time mode. For code examples, see Real-time mode examples.
What is real-time mode?
Operational vs. analytical workloads
Streaming workloads can be broadly divided into analytical workloads and operational workloads:
- Analytical workloads use data ingestion and transformation, typically following the medallion architecture (for example, ingesting data into the bronze, silver, and gold tables).
- Operational workloads consume real-time data, apply business logic, and trigger downstream actions or decisions.
Some examples of operational workloads are:
- Blocking or flagging a credit card transaction in real time if a fraud score exceeds a threshold, based on factors like unusual location, large transaction size, or rapid spending patterns.
- Delivering a promotional message when clickstream data shows a user has been browsing for jeans for five minutes, offering a 25% discount if they purchase in the next 15 minutes.
In general, operational workloads are characterized by the need for sub-second end-to-end latency. This can be achieved with real-time mode in Apache Spark Structured Streaming.
How real-time mode achieves low latency
Real-time mode improves the execution architecture by:
- Executing long-running batches (the default is five minutes), in which the system processes data as it becomes available in the source.
- Scheduling all stages of the query simultaneously. This requires the number of available task slots to be equal to or greater than the number of tasks of all the stages in a batch.
- Passing data between stages as soon as it is produced using a streaming shuffle.
At the end of processing a batch, and before the next batch starts, Structured Streaming checkpoints progress and publishes metrics. The batch duration affects checkpointing frequency:
- Longer batches: Less frequent checkpointing, which means longer replays on failure and delayed metrics availability.
- Shorter batches: More frequent checkpointing, which may affect latency.
Databricks recommends benchmarking real-time mode against your target workload to find the appropriate trigger interval.
When to use real-time mode
Choose real-time mode when your use case requires:
- Sub-second latency: Applications that need to respond to data within milliseconds, such as fraud detection systems that must block transactions in real time.
- Operational decision-making: Systems that trigger immediate actions based on incoming data, like real-time offers, alerts, or notifications.
- Continuous processing: Workloads where data must be processed as soon as it arrives, rather than in periodic batches.
Use micro-batch mode (the default Structured Streaming trigger) when:
- Analytical processing: ETL pipelines, data transformations, and medallion architecture implementations where latency requirements are measured in seconds or minutes.
- Cost optimization: Workloads where sub-second latency is not required, as real-time mode requires dedicated compute resources.
- The checkpoint frequency matters: Applications that benefit from more frequent checkpointing for faster recovery.
Requirements and configuration
Real-time mode has specific requirements for compute setup and query configuration. This section describes the prerequisites and configuration steps needed to use real-time mode.
Prerequisites
To use real-time mode, you must meet the following requirements:
- Databricks Runtime 16.4 LTS or above: Real-time mode is only available in DBR 16.4 LTS and later versions.
- Dedicated compute: You must use a dedicated (formerly single user) compute. Standard (formerly shared), Lakeflow Spark Declarative Pipelines, and serverless clusters are not supported.
- No autoscaling: Autoscaling must be disabled.
- No Photon: Photon acceleration is not supported with real-time mode.
- Spark configuration: You must set
spark.databricks.streaming.realTimeMode.enabledtotrue.
Compute configuration
Configure your compute with the following settings:
- Set
spark.databricks.streaming.realTimeMode.enabledtotruein the Spark configuration. - Disable autoscaling.
- Disable Photon acceleration.
- Ensure the compute is configured as a dedicated cluster (not standard, Lakeflow Spark Declarative Pipelines, or serverless).
For step-by-step instructions on creating and configuring compute for real-time mode, see Get started with real-time mode.
Query configuration
To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Compute sizing
You can run one real-time job per compute resource if the compute has enough task slots.
To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.
Slot calculation examples
| Pipeline type | Configuration | Required slots |
|---|---|---|
| Single-stage stateless (Kafka source + sink) | maxPartitions = 8 |
8 slots |
| Two-stage stateful (Kafka source + shuffle) | maxPartitions = 8, shuffle partitions = 20 |
28 slots (8 + 20) |
| Three-stage (Kafka source + shuffle + repartition) | maxPartitions = 8, two shuffle stages of 20 each |
48 slots (8 + 20 + 20) |
If you don't set maxPartitions, use the number of partitions in the Kafka topic.
Key considerations
When you configure your compute, consider the following:
- Unlike micro-batch mode, real-time tasks can stay idle while waiting for data, so right-sizing is essential to avoid wasted resources.
- Aim for a target utilization level (for example, 50%) by tuning:
maxPartitions(for Kafka)spark.sql.shuffle.partitions(for shuffle stages)
- Databricks recommends setting
maxPartitionsso that each task handles multiple Kafka partitions to reduce overhead. - Adjust task slots per worker to match workload for simple one-stage jobs.
- For shuffle-heavy jobs, experiment to find the minimum number of shuffle partitions that avoid backlogs and adjust from there. The compute won't schedule the job if it doesn't have enough slots.
Note
From Databricks Runtime 16.4 LTS and above, all real-time pipelines use checkpoint v2, which allows seamless switching between real-time and micro-batch modes.
Optimization techniques
| Technique | Enabled by default |
|---|---|
| Asynchronous progress tracking: Moves writing to offset log and commit log into an asynchronous thread, reducing the inter-batch time between two micro-batches. This can help reduce the latency of stateless streaming queries. | No |
| Asynchronous state checkpointing: Helps reduce the latency of stateful streaming queries by beginning to process the next micro-batch as soon as the computation of the previous micro-batch completes, without waiting for state checkpointing. | No |
Monitoring and observability
Measuring query performance is essential for real-time workloads. In real-time mode, traditional batch duration metrics don't reflect actual latency, so you need alternative approaches.
End-to-end latency is workload-specific and sometimes can only be accurately measured with business logic. For example, if the source timestamp is output in Kafka, you can calculate latency as the difference between Kafka's output timestamp and the source timestamp.
You can also estimate end-to-end latency using the built-in metrics and APIs described below.
Built-in metrics with StreamingQueryProgress
The following metrics are included in the StreamingQueryProgress event, which is automatically logged in the driver logs. You can also access them through the StreamingQueryListener's onQueryProgress() callback function. QueryProgressEvent.json() or toString() include extra real-time mode metrics.
- Processing latency (processingLatencyMs). The time elapsed between when the real-time mode query reads a record and when the query writes it to the next stage or downstream. For single-stage queries, this measures the same duration as E2E latency. The system reports this metric per task.
- Source queuing latency (sourceQueuingLatencyMs). The amount of time elapsed between when the system writes a record to a message bus, for example, the log append time in Kafka, and when the real-time mode query first reads the record. The system reports this metric per task.
- E2E Latency (e2eLatencyMs). The time between when the system writes the record to a message bus and when the real-time mode query writes the record downstream. The system aggregates this metric per batch across all records processed by all tasks.
For example:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Custom latency measurement with Observe API
The Observe API helps measure latency without launching another job. If you have a source timestamp that approximates the source data arrival time, you can estimate each batch's latency using the Observe API. Pass the timestamp before reaching the sink:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
In this example, a current timestamp is recorded before outputting the entry, and latency is estimated by calculating the difference between this timestamp and the record's source timestamp. The results are included in progress reports and made available to listeners. Here is a sample output:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
Feature support and limitations
This section describes the supported features and current limitations of real-time mode, including compatible environments, languages, sources, sinks, operators, and special considerations for specific features.
Supported environments, languages, and modes
| Compute type | Supported |
|---|---|
| Dedicated (formerly: single user) | Yes |
| Standard (formerly: shared) | No |
| Lakeflow Spark Declarative Pipelines Classic | No |
| Lakeflow Spark Declarative Pipelines Serverless | No |
| Serverless | No |
Supported languages:
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Supported execution modes:
| Execution Mode | Supported |
|---|---|
| Update mode | Yes |
| Append mode | No |
| Complete mode | No |
Supported sources and sinks
Sources:
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Event Hubs (using Kafka Connector) | Yes |
| Kinesis | Yes (only EFO mode) |
| Google Pub/Sub | No |
| Apache Pulsar | No |
Sinks:
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Event Hubs (using Kafka Connector) | Yes |
| Kinesis | No |
| Google Pub/Sub | No |
| Apache Pulsar | No |
| Arbitrary Sinks (using forEachWriter) | Yes |
Supported operators
| Operators | Supported |
|---|---|
| Stateless Operations | |
| Selection | Yes |
| Projection | Yes |
| UDFs | |
| Scala UDF | Yes (with some limitations) |
| Python UDF | Yes (with some limitations) |
| Aggregation | |
| sum | Yes |
| count | Yes |
| max | Yes |
| min | Yes |
| avg | Yes |
| Aggregations functions | Yes |
| Windowing | |
| Tumbling | Yes |
| Sliding | Yes |
| Session | No |
| Deduplication | |
| dropDuplicates | Yes (the state is unbounded) |
| dropDuplicatesWithinWatermark | No |
| Stream - Table Join | |
| Broadcast table (should be small) | Yes |
| Stream - Stream Join | No |
| (flat)MapGroupsWithState | No |
| transformWithState | Yes (with some differences) |
| union | Yes (with some limitations) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | No (see limitation) |
Special considerations
Some operators and features have specific considerations or differences when used in real-time mode.
transformWithState in real-time mode
For building custom stateful applications, Databricks supports transformWithState, an API in Apache Spark Structured Streaming. See Build a custom stateful application for more information about the API and code snippets.
However, there are some differences between how the API behaves in real-time mode and traditional streaming queries that leverage the micro-batch architecture.
- Real-time mode calls the
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)method for each row.- The
inputRowsiterator returns a single value. Micro-batch mode calls it once for each key, and theinputRowsiterator returns all values for a key in the micro batch. - You must be aware of this difference when writing your code.
- The
- Event time timers are not supported in real-time mode.
- In real-time mode, timers are delayed in firing depending on data arrival:
- If a timer is scheduled for 10:00:00 but no data arrives, the timer doesn't fire immediately.
- If data arrives at 10:00:10, the timer fires with a 10-second delay.
- If no data arrives and the long-running batch is terminating, the timer fires before the batch terminates.
Python UDFs in real-time mode
Databricks supports the majority of Python user-defined functions (UDFs) in real-time mode:
| UDF type | Supported |
|---|---|
| Stateless UDF | |
| Python scalar UDF (link) | Yes |
| Arrow scalar UDF | Yes |
| Pandas scalar UDF (link) | Yes |
Arrow function (mapInArrow) |
Yes |
| Pandas function (link) | Yes |
| Stateful Grouping UDF (UDAF) | |
transformWithState (only Row interface) |
Yes |
| applyInPandasWithState | No |
| Non-stateful Grouping UDF (UDAF) | |
| apply | No |
| applyInArrow | No |
| applyInPandas | No |
| Table function | |
| UDTF (link) | No |
| UC UDF | No |
There are several points to consider when using Python UDFs in real-time mode:
- To minimize the latency, configure the Arrow batch size (
spark.sql.execution.arrow.maxRecordsPerBatch) to 1.- Trade-off: This configuration optimizes for latency at the expense of throughput. For most workloads, this setting is recommended.
- Increase the batch size only if a higher throughput is required to accommodate input volume, accepting the potential increase in latency.
- Pandas UDFs and functions do not perform well with an Arrow batch size of 1.
- If you use pandas UDFs or functions, set the Arrow batch size to a higher value (for example, 100 or higher).
- Note that this implies higher latency. Databricks recommends using Arrow UDF or function if possible.
- Due to the performance issue with pandas, transformWithState is only supported with the
Rowinterface.
Limitations
Source limitations
For Kinesis, real-time mode doesn't support polling mode. Moreover, frequent repartitions might negatively impact latency.
Union limitations
The Union operator has some limitations:
- Real-time mode doesn't support self-union:
- Kafka: You can't use the same source data frame object and union derived data frames from it. Workaround: Use different DataFrames that read from the same source.
- Kinesis: You can't union data frames derived from the same Kinesis source with the same configuration. Workaround: Besides using different DataFrames, you can assign a different 'consumerName' option to each DataFrame.
- Real-time mode doesn't support stateful operators (for example,
aggregate,deduplicate,transformWithState) defined before the Union. - Real-time mode doesn't support union with batch sources.
MapPartitions limitation
mapPartitions in Scala and similar Python APIs (mapInPandas, mapInArrow) take an iterator of the entire input partition and produce an iterator of the entire output with arbitrary mapping between input and output. These APIs can cause performance issues in Streaming Real-Time Mode by blocking the entire output, which increases latency. The semantics of these APIs don't support watermark propagation well.
Use scalar UDFs combined with Transform complex data types or filter instead to achieve similar functionality.
Next steps
Now that you understand what real-time mode is and how to configure it, explore these resources to start implementing real-time streaming applications:
- Get started with real-time mode - Follow step-by-step instructions to configure compute and run your first real-time streaming query.
- Real-time mode code examples - Explore working examples including Kafka sources and sinks, stateful queries, aggregations, and custom sinks.
- Structured Streaming concepts - Learn the foundational concepts of Structured Streaming on Databricks.