Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
You can use transformWithState to build stateful streaming applications and to implement low-latency and near real-time solutions. With custom stateful operators, you can create arbitrary stateful logic that allows you to build new operational use cases that are not possible with traditional Structured Streaming processing.
Note
For stateful operations such as aggregations, deduplication, and streaming joins, Databricks recommends using built-in Structured Streaming operators instead of custom logic. See What is stateful streaming?.
Databricks recommends using transformWithState instead of legacy operators, such as flatMapGroupsWithState and mapGroupsWithState, for arbitrary state transformations. See Legacy arbitrary stateful operators.
Requirements
The transformWithState and transformWithStateInPandas operators have the following requirements:
- Available in Databricks Runtime 16.2 and above.
- For real-time mode, use Databricks Runtime 17.3 LTS or above. See Real-time mode in Structured Streaming.
- For standard access mode, Python is available in Databricks Runtime 16.3 and above, and Scala is available in Databricks Runtime 17.3 and above.
- RocksDB is the default state store provider in Databricks Runtime 17.3 and above.
For Databricks Runtime 17.2 and below, you must configure the RocksDB state store provider. Databricks recommends enabling RocksDB in the Spark configuration.
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
What is transformWithState?
The transformWithState operator applies a custom stateful processor to a Structured Streaming query. You must implement a custom stateful processor to use transformWithState. Structured Streaming includes APIs for building your stateful processor using Python, Scala, or Java.
Use transformWithState to apply custom logic to a grouping key. The following describes the high-level design:
- Define one or more state variables.
- State information persists for each grouping key. You can access each state variable in user-defined code.
- For each micro batch processed, all rows for the key are available as an iterator.
- Use the
StatefulProcessorHandlewith timers and user-defined conditions to control how to emit rows. - To manage state expiration and state size, state values support individual time-to-live (TTL) definitions.
Because transformWithState supports schema evolution in the state store, you can iterate and update your production applications without losing historical state information. After updating the state schema, you aren't required to reprocess rows, which simplifies code deployments and maintenance. See Schema evolution in the state store.
Important
Azure Databricks documentation uses transformWithState to describe both Python and Scala implementations:
- PySpark supports both the row-based
transformWithStateAPI and the Pandas-basedtransformWithStateInPandasoperator.transformWithStateInPandasis not supported in real-time mode. Instead, usetransformWithState. For details, seetransformWithStatein real-time mode.
- Scala supports only the row-based
transformWithStateAPI.
The Scala and Python implementations of transformWithState have the same capabilities, but with some differences in syntax.
Defining a StatefulProcessor
You define a stateful processor by extending the StatefulProcessor class and implementing its methods.
Spark passes a StatefulProcessorHandle to the init method of your StatefulProcessor. Use the handle to create state variables and interact with the state store.
transformWithState supports three state types: ValueState, ListState, and MapState. Each type stores state for each grouping key using a different underlying data structure.
Implement the following methods to define your custom logic:
- Implement
handleInputRowsto control how your application processes data, updates state, and emits rows for each micro-batch. See Handle input rows. - Implement
handleExpiredTimerto run time-based logic regardless of whether the grouping key receives new rows in a micro-batch. See Handle expired timers. - Optionally, implement
handleInitialStateto pre-populate state before your application processes any input rows. See Handle initial state.
The following table compares the functional behaviors of these methods:
| Behavior | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Get, put, update, or clear state values | Yes | Yes |
| Create or delete a timer | Yes | Yes |
| Emit rows | Yes | Yes |
| Iterate over rows in the current micro batch | Yes | No |
| Trigger logic based on elapsed time | No | Yes |
You can combine both handleInputRows and handleExpiredTimer to implement complex logic as needed.
For example, you could implement an application that uses handleInputRows to update state values for each micro-batch and set a timer of 10 seconds in the future. If no additional rows are processed, you can use handleExpiredTimer to emit the current values in the state store. If new rows are processed for the grouping key, you can clear the existing timer and set a new timer.
StatefulProcessorHandle
In PySpark, the StatefulProcessorHandle class allows you to access functions that control how your code uses state information.
When initializing a StatefulProcessor, you must always import and pass the StatefulProcessorHandle to the handle variable. The handle variable ties the local variable in your Python class to the state variable.
Note
Scala uses the getHandle method.
Custom state types
You can implement multiple state objects in a single stateful operator.
Choose a state type based on your complete application logic. For example, you could track sessions with a ValueState grouped by user_id and session_id. Or, to evaluate conditions across multiple sessions, use a MapState grouped by user_id with session_id as the map key.
If your state object uses a StructType, you must define unique names for each field in the struct for the schema. These names are visible when reading the state store. See Read Structured Streaming state information.
The following sections describe the state types supported by transformWithState:
ValueState
ValueState stores a value for each grouping key.
A value state can include complex types, such as a struct or tuple. For ValueState, you must implement logic to replace the entire value.
The time-to-live for a value state resets when the value updates. If you process a source key for ValueState without updating the stored ValueState, the time-to-live isn't reset.
ListState
ListState stores a list for each grouping key.
A list state is a collection of values, each of which can include complex types. Each value in a list has its own time-to-live.
You can add items to a list by appending individual items, appending a list of items, or overwriting the entire list with a put. To reset time-to-live, you must use a put operation.
MapState
MapState stores a map for each grouping key. Maps are the Apache Spark equivalent to a Python dictionary (dict).
A map state is a collection of distinct keys that each map to a value, each of which can include complex types. Each key-value pair in a map has its own time-to-live.
You can update the value of a specific key, or you can remove a key and its value. You can return an individual value using its key, list all keys, list all values, or return an iterator to work with the complete set of key-value pairs in the map.
Important
Grouping keys describe the fields specified in the GROUP BY clause of Structured Streaming query. Map states can contain an arbitrary number of key-value pairs for a grouping key.
For example, if your query uses GROUP BY user_id and you want to define a map for each session_id, your grouping key is user_id and the MapState key is session_id:
Python
class SessionTracker(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.sessions = handle.getMapState("sessions", StringType(), LongType())
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
for row in rows:
session_id = row["session_id"] # session_id is the MapState key
count = self.sessions.getValue(session_id)[0] if self.sessions.containsKey(session_id) else 0
new_count = count + 1
self.sessions.updateValue(session_id, (new_count,))
yield from []
def close(self) -> None:
pass
df.groupBy("user_id").transformWithState(SessionTracker(), ...) # user_id is the grouping key
Scala
case class Event(userId: String, sessionId: String)
class SessionTracker extends StatefulProcessor[String, Event, Row] {
@transient private var sessions: MapState[String, Long] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
sessions = getHandle.getMapState[String, Long]("sessions", Encoders.STRING, Encoders.scalaLong, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
rows: Iterator[Event],
timerValues: TimerValues): Iterator[Row] = {
rows.foreach { event =>
val count = if (sessions.containsKey(event.sessionId)) sessions.getValue(event.sessionId) else 0L
sessions.updateValue(event.sessionId, count + 1) // sessionId is the MapState key
}
Iterator.empty
}
}
df.as[Event]
.groupByKey(_.userId) // userId is the grouping key
.transformWithState(new SessionTracker(), TimeMode.None(), OutputMode.Update())
Create a custom state variable in the StatefulProcessor
When you initialize your StatefulProcessor, you create a local variable for each state object that allows you to interact with state objects in your custom logic. Define and initialize state variables by overriding the built-in init method in the StatefulProcessor class.
You can define any number of state objects using the getValueState, getListState, and getMapState methods in your StatefulProcessor.
Each state object must have the following:
- A unique name
- A schema
- In Python, you must specify the schema.
- In Scala, you can pass an
Encoderto specify state schema.
Optionally, you can also provide a time-to-live (TTL) duration in milliseconds. If implementing a map state, you must provide a separate schema definition for the map keys and the values.
Note
The StatefulProcessor handles logic separately for querying, updating, and emitting state information. See Use your state variables in methods with custom logic.
Use your state variables in methods with custom logic
State objects have methods for getting state, updating existing state information, and clearing the current state.
Each grouping key has dedicated state information.
- The
StatefulProcessoremits rows based on your custom logic and specified output schema. See Emit rows. - Use the
statestorereader to access values in the state store. This reader is intended for batch workloads and isn't intended for low-latency workloads. See Read Structured Streaming state information. - Logic specified using
handleInputRowsonly runs if rows for the key are present in a micro-batch. See Handle input rows. - Use
handleExpiredTimerto implement time-based logic that doesn't depend on observing rows to fire. See Handle expired timers.
Note
State objects are isolated by grouping keys with the following implications:
- State values can't be affected by rows associated with a different grouping key.
- You cannot implement logic that depends on comparing values or updating state across grouping keys.
You can compare values within a grouping key. Use a MapState to implement logic with a second key that your custom logic can use. For example, grouping by user_id and using ip_address for your MapState key allows you to track simultaneous user sessions.
Advanced considerations for working with state
State updates are fault-tolerant. If a task crashes before a micro-batch has finished processing, the retry uses the value from the last successful micro-batch.
For optimized performance, Databricks recommends that you process all values in the iterator for a given key and commit updates in a single write. When you write to a state variable, this triggers a write to RocksDB.
State values do not have defaults. If your logic requires reading existing state information, use the exists method.
To implement logic for null state, MapState variables allow you to check for individual keys or list all keys.
Handle input rows
Use the handleInputRows method to define how your application processes rows and updates state values. This method runs each time your Structured Streaming query processes rows for a grouping key.
For most stateful applications implemented with transformWithState, the core logic is defined using handleInputRows.
For each micro-batch update processed, all rows in the micro-batch for a given grouping key are available using an iterator. User-defined logic can interact with all rows from the current microbatch and values in the statestore.
Handle expired timers
Use the handleExpiredTimer method to implement custom logic based on elapsed time.
Within a grouping key, timers are uniquely identified by their timestamp.
When a timer expires, the result is determined by the logic implemented in your application. Common patterns include:
- Emitting information stored in a state variable.
- Evicting stored state information.
- Creating a new timer.
Expired timers fire even if no rows for their associated key are processed in a micro-batch.
Specify the time mode
When passing your StatefulProcessor to transformWithState, you must specify the time mode using the timeMode parameter.
The following options are supported:
| Time mode | Description |
|---|---|
ProcessingTime |
Timers and TTL are both supported and are evaluated based on the wall-clock time when Apache Spark processes each micro-batch. Use ProcessingTime when you want timers to fire at a fixed interval relative to when rows are processed, regardless of timestamps in the data. |
EventTime |
Timers are supported and are evaluated based on the event-time watermark. The watermark advances as Apache Spark observes timestamps in the input data. TTL is not supported with EventTime. Use EventTime when your data contains timestamps and you want timers to fire based on the progress of those timestamps. When using EventTime, you must also specify the eventTimeColumnName parameter. See eventTimeColumnName. |
NoTime or TimeMode.None() |
Timers and TTL are not supported. Use NoTime when your stateful application does not require time-based logic. |
eventTimeColumnName
When using the EventTime time mode, the eventTimeColumnName parameter specifies the name of the column in your output schema that contains the event timestamp. Apache Spark uses this column to propagate the watermark to the output stream, enabling correct downstream time-based operations.
Python
eventTimeColumnName is an additional argument for transformWithState or transformWithStateInPandas:
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="EventTime",
eventTimeColumnName="outputTimestamp",
)
.writeStream...
)
Scala
transformWithState accepts eventTimeColumnName in place of timeMode. This approach always uses EventTime mode:
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new MyProcessor(),
"outputTimestamp",
OutputMode.Append(),
)
.writeStream...
Built-in timer values
Databricks recommends against invoking the system clock in your custom stateful application, as this can lead to unreliable retries on task failure. Use the methods in the TimerValues class when you must access the processing time or watermark:
TimerValues |
Description |
|---|---|
getCurrentProcessingTimeInMs |
Returns the timestamp of the processing time for the current batch in milliseconds since epoch. |
getCurrentWatermarkInMs |
Returns the timestamp of the watermark for the current batch in milliseconds since epoch. |
Note
Processing time describes the time that the micro-batch is processed by Apache Spark. Many streaming sources, such as Kafka, also include system processing time.
Watermarks on streaming queries are often defined against event time or the processing time of the streaming source. See Apply watermarks to control data processing thresholds.
Both watermarks and windows can be used in combination with transformWithState. You might implement similar functionality in your custom stateful application by leveraging TTL, timers, and MapState or ListState functionality.
Time-to-live (TTL) for state types
To prevent out-of-memory errors and to remove stale state type values, transformWithState supports an optional time to live (TTL) value for each state type value. After expiration, TTL silently evicts state type values. TTL does not run handleExpiredTimer or any custom logic. To run code when state expires, use a timer instead.
Important
If you don't implement TTL, you must handle state eviction to avoid out-of-memory errors.
For all state types, TTL resets when updating state information. TTL is enforced for each state type value, with different rules for each state type:
- State variables are scoped to grouping keys.
- For
ValueStateobjects, only a single value is stored per grouping key. TTL applies to this value. - For
ListStateobjects, the list can contain many values. TTL applies to each value in a list independently.- While TTL is scoped to individual values in a
ListState, the only way to update an individual value is with theputmethod, which overwrites the entire contents of theListStatevariable and resets TTL for all values in the list.
- While TTL is scoped to individual values in a
- For
MapStateobjects, each map key has an associated state value. TTL applies independently to each key-value pair in a map.
Note
Timers allow you to define custom logic beyond state eviction, including emitting rows. Optionally, you can use timers to both clear state information for a given state value, and emit values or trigger conditional logic. See Handle expired timers.
Example stateful application
The following example defines a custom stateful processor, SimpleCounterProcessor, including example state variables. SimpleCounterProcessor uses ValueState, ListState, and MapState to count rows for each grouping key.
Python (Pandas)
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Python (row-based)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = 0
for row in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
count += 1
self.value_state.update((count,)) # Count is passed as a tuple
iter_list = self.list_state.get()
list_state_value = next(iter_list)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield Row(id=key, countAsString=str(count))
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
private val longEncoder = Encoders.scalaLong
private val intEncoder = Encoders.scalaInt
private val stringEncoder = Encoders.STRING
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
intEncoder, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
intEncoder, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
stringEncoder, intEncoder, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
For more examples, see Example stateful applications.
Note
In Python, state values are tuples. Pass tuples to put and update, and expect tuples from get.
For example, if the schema for your ValueState is a single integer:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Use this approach for items in a ListState or values in a MapState too.
Emit rows
You must use handleInputRows or handleExpiredTimer to define how transformWithState emits rows for each grouping key. See Handle input rows and Handle expired timers.
Custom stateful applications make no assumptions about how to use state information. For a given condition, the application might emit no rows, one row, or many rows.
Note
You can implement multiple state values and define multiple conditions for emitting rows, but all rows must use the same schema.
Python (Pandas)
With transformWithStateInPandas, define your output schema with the outputStructType keyword.
Emit rows using a pandas DataFrame object and yield.
Optionally, you can yield an empty DataFrame. If you use update output mode and emit an empty DataFrame, this updates the values for the grouping key to be null.
Python (row-based)
With transformWithState, define your output schema with the outputStructType keyword.
Emit rows using a Row object and yield.
Optionally, you can return an empty iterator. If you use update output mode and emit an empty iterator, this updates the values for the grouping key to be null.
Scala
In Scala, you emit rows using an Iterator object. The schema derives itself automatically from the schema of the emitted rows.
Optionally, you can return an empty Iterator. If you use update output mode and emit an empty Iterator, this updates the values for the grouping key to be null.
Handle initial state
Optionally, you can pass an initial state to the first micro-batch.
For example, you might use this to:
- Migrate an existing workflow to a new custom application.
- Upgrade a stateful operator to change your schema or logic.
- Repair a failure that can't be automatically repaired and requires manual intervention.
Note
Use the state store reader to query state information from an existing checkpoint. See Read Structured Streaming state information.
If you are converting an existing Delta table to a stateful application, read the table using spark.read.table("table_name") and pass the resulting DataFrame. You can optionally select or modify fields to conform to your new stateful application.
You provide an initial state using a DataFrame with the same grouping key schema as the input rows.
Note
Python uses handleInitialState to specify the initial state while defining a StatefulProcessor. Scala uses the distinct class StatefulProcessorWithInitialState.
The following example seeds a per-key counter from an existing Delta table:
Python (row-based)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
class CounterWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("count", IntegerType(), True)])
self.count_state = handle.getValueState("countState", state_schema)
def handleInitialState(self, key, initialState: Row, timerValues) -> None:
self.count_state.update((initialState["count"],))
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = self.count_state.get()[0] if self.count_state.exists() else 0
for _ in rows:
count += 1
self.count_state.update((count,))
yield Row(id=key[0], count=count)
def close(self) -> None:
pass
output_schema = StructType([
StructField("id", StringType(), True),
StructField("count", IntegerType(), True),
])
# Load existing counts as initial state — must use the same grouping key as the input
initial_state = spark.read.table("existing_counts").groupBy("id")
q = (
df.groupBy("id")
.transformWithState(
statefulProcessor=CounterWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
class CounterWithInitialState
extends StatefulProcessorWithInitialState[String, (String, String), (String, String), (String, Int)] {
@transient private var countState: ValueState[Int] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState", Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
countState.update(initialState._2)
}
override def handleInputRows(
key: String,
rows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
val count = if (countState.exists()) countState.get() else 0
val newCount = count + rows.size
countState.update(newCount)
Iterator((key, newCount.toString))
}
}
// Load existing counts as initial state — must use the same grouping key as the input
val initialState = spark.read.table("existing_counts")
.as[(String, Int)]
.groupByKey(_._1)
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(_._1)
.transformWithState(
new CounterWithInitialState(),
TimeMode.None(),
OutputMode.Update(),
initialState,
)
.writeStream...
Use transformWithState in Lakeflow Spark Declarative Pipelines
Use the transformWithState operator within Lakeflow Spark Declarative Pipelines to implement arbitrary stateful logic in your streaming pipelines using Python.
To do so, complete the following steps:
- Define the output schema and the stateful processor logic for your arbitrary stateful transformations. For examples, see Example stateful applications.
- Create an Lakeflow Spark Declarative Pipelines flow that invokes the
transformWithStateoperator on a DataFrame. See Tutorial: Create your first pipeline using the Lakeflow Pipelines Editor. - Run your pipeline and validate the results on the target table or sink.
For an example that uses transformWithState to monitor sensor heartbeats, see Example: Use transformWithState to monitor sensor heartbeats.