Rebalance Listener
When partitions are assigned or revoked to a stream, a rebalance is triggered. Different accions can be performed, depending on your use cases, for example:
- Cleanup or custom state save on the start of a rebalance operation
- Saving offsets in a custom store when a partition is
revoked - Load a state or cache warmup on completion of a successful partition re-assignment.
To do that, you will need a RebalanceListener.
Metrics Rebalance Listener
Kstreams use a default listener for all the streams to clean the metrics after a rebalance takes place
kstreams.MetricsRebalanceListener
on_partitions_assigned(assigned)
async
Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.
This method will start the Prometheus metrics
Attributes:
| Name | Type | Description |
|---|---|---|
assigned |
Set[TopicPartition]
|
Partitions assigned to the consumer (may include partitions that were previously assigned) |
on_partitions_revoked(revoked)
async
Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.
This will method will clean up the Prometheus metrics
Attributes:
| Name | Type | Description |
|---|---|---|
revoked |
Set[TopicPartitions]
|
Partitions that were assigned to the consumer on the last rebalance |
Manual Commit
If manual commit is enabled, you migh want to use the ManualCommitRebalanceListener. This rebalance listener will call commit
before the stream partitions are revoked to avoid the error CommitFailedError and duplicate message delivery after a rebalance. See code example with
manual commit
kstreams.ManualCommitRebalanceListener
on_partitions_revoked(revoked)
async
Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.
If manual commit is enabled, commit is called before the consumers
partitions are revoked to prevent the error CommitFailedError
and duplicate message delivery after a rebalance.
Attributes:
| Name | Type | Description |
|---|---|---|
revoked |
Set[TopicPartitions]
|
Partitions that were assigned to the consumer on the last rebalance |
Note
ManualCommitRebalanceListener also includes the MetricsRebalanceListener funcionality.
Custom Rebalance Listener
If you want to define a custom RebalanceListener, it has to inherits from kstreams.RebalanceListener.
kstreams.RebalanceListener
A callback interface that the user can implement to trigger custom actions
when the set of partitions are assigned or revoked to the Stream.
Example
from kstreams import RebalanceListener, TopicPartition
from .resource import stream_engine
class MyRebalanceListener(RebalanceListener):
async def on_partitions_revoked(
self, revoked: Set[TopicPartition]
) -> None:
# Do something with the revoked partitions
# or with the Stream
print(self.stream)
async def on_partitions_assigned(
self, assigned: Set[TopicPartition]
) -> None:
# Do something with the assigned partitions
# or with the Stream
print(self.stream)
@stream_engine.stream(topic, rebalance_listener=MyRebalanceListener())
async def my_stream(stream: Stream):
async for event in stream:
...
on_partitions_assigned(assigned)
async
Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.
It is guaranteed that all the processes in a consumer group will
execute their on_partitions_revoked callback before any instance
executes its on_partitions_assigned callback.
Use cases
- Load a state or cache warmup on completion of a successful partition re-assignment.
Attributes:
| Name | Type | Description |
|---|---|---|
assigned |
Set[TopicPartition]
|
Partitions assigned to the consumer (may include partitions that were previously assigned) |
Note
The Stream is available using self.stream
on_partitions_revoked(revoked)
async
Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.
If you are using manual commit you have to commit all consumed offsets here, to avoid duplicate message delivery after rebalance is finished.
Use cases
- cleanup or custom state save on the start of a rebalance operation
- saving offsets in a custom store
Attributes:
| Name | Type | Description |
|---|---|---|
revoked |
Set[TopicPartitions]
|
Partitions that were assigned to the consumer on the last rebalance |
Note
The Stream is available using self.stream
Note
It also possible to inherits from ManualCommitRebalanceListener and MetricsRebalanceListener