Skip to content

Rebalance Listener

When partitions are assigned or revoked to a stream, a rebalance is triggered. Different accions can be performed, depending on your use cases, for example:

  • Cleanup or custom state save on the start of a rebalance operation
  • Saving offsets in a custom store when a partition is revoked
  • Load a state or cache warmup on completion of a successful partition re-assignment.

To do that, you will need a RebalanceListener.

Metrics Rebalance Listener

Kstreams use a default listener for all the streams to clean the metrics after a rebalance takes place

kstreams.MetricsRebalanceListener

on_partitions_assigned(assigned) async

Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.

This method will start the Prometheus metrics

Attributes:

Name Type Description
assigned Set[TopicPartition]

Partitions assigned to the consumer (may include partitions that were previously assigned)

on_partitions_revoked(revoked) async

Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.

This will method will clean up the Prometheus metrics

Attributes:

Name Type Description
revoked Set[TopicPartitions]

Partitions that were assigned to the consumer on the last rebalance

Manual Commit

If manual commit is enabled, you migh want to use the ManualCommitRebalanceListener. This rebalance listener will call commit before the stream partitions are revoked to avoid the error CommitFailedError and duplicate message delivery after a rebalance. See code example with manual commit

kstreams.ManualCommitRebalanceListener

on_partitions_revoked(revoked) async

Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.

If manual commit is enabled, commit is called before the consumers partitions are revoked to prevent the error CommitFailedError and duplicate message delivery after a rebalance.

Attributes:

Name Type Description
revoked Set[TopicPartitions]

Partitions that were assigned to the consumer on the last rebalance

Note

ManualCommitRebalanceListener also includes the MetricsRebalanceListener funcionality.

Custom Rebalance Listener

If you want to define a custom RebalanceListener, it has to inherits from kstreams.RebalanceListener.

kstreams.RebalanceListener

A callback interface that the user can implement to trigger custom actions when the set of partitions are assigned or revoked to the Stream.

Example

from kstreams import RebalanceListener, TopicPartition
from .resource import stream_engine


class MyRebalanceListener(RebalanceListener):

    async def on_partitions_revoked(
        self, revoked: Set[TopicPartition]
    ) -> None:
        # Do something with the revoked partitions
        # or with the Stream
        print(self.stream)

    async def on_partitions_assigned(
        self, assigned: Set[TopicPartition]
    ) -> None:
        # Do something with the assigned partitions
        # or with the Stream
        print(self.stream)


@stream_engine.stream(topic, rebalance_listener=MyRebalanceListener())
async def my_stream(stream: Stream):
    async for event in stream:
        ...

on_partitions_assigned(assigned) async

Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.

It is guaranteed that all the processes in a consumer group will execute their on_partitions_revoked callback before any instance executes its on_partitions_assigned callback.

Use cases
  • Load a state or cache warmup on completion of a successful partition re-assignment.

Attributes:

Name Type Description
assigned Set[TopicPartition]

Partitions assigned to the consumer (may include partitions that were previously assigned)

Note

The Stream is available using self.stream

on_partitions_revoked(revoked) async

Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.

If you are using manual commit you have to commit all consumed offsets here, to avoid duplicate message delivery after rebalance is finished.

Use cases
  • cleanup or custom state save on the start of a rebalance operation
  • saving offsets in a custom store

Attributes:

Name Type Description
revoked Set[TopicPartitions]

Partitions that were assigned to the consumer on the last rebalance

Note

The Stream is available using self.stream

Note

It also possible to inherits from ManualCommitRebalanceListener and MetricsRebalanceListener