Skip to content

Streams

A Stream in kstreams is an extension of AIOKafkaConsumer

Consuming can be done using kstreams.Stream. You only need to decorate a coroutine with @stream_engine.streams. The decorator has the same aiokafka consumer API at initialization, in other words they accept the same args and kwargs that the aiokafka consumer accepts.

kstreams.streams.Stream

Attributes:

Name Type Description
name Optional[str]

Stream name. Default is a generated uuid4

topics List[str]

List of topics to consume

subscribe_by_pattern bool

Whether subscribe to topics by pattern

backend Kafka

backend kstreams.backends.kafka.Kafka: Backend to connect. Default Kafka

func Callable[[Stream], Awaitable[Any]]

Coroutine fucntion or generator to be called when an event arrives

config Dict[str, Any]

Stream configuration. Here all the properties can be passed in the dictionary

deserializer Deserializer

Deserializer to be used when an event is consumed

initial_offsets List[TopicPartitionOffset]

List of TopicPartitionOffset that will seek the initial offsets to

rebalance_listener RebalanceListener

Listener callbacks when partition are assigned or revoked

Subscribe to a topic

Example

import aiorun
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--kstreams", group_id="my-group-id")
async def stream(cr: ConsumerRecord) -> None:
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def start():
    await stream_engine.start()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(
        start(),
        stop_on_unhandled_errors=True,
        shutdown_callback=shutdown
    )

Subscribe to multiple topics

Consuming from multiple topics using one stream is possible. A List[str] of topics must be provided.

Example

import aiorun
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(
    ["local--kstreams", "local--hello-world"],
    group_id="my-group-id",
)
async def consume(cr: ConsumerRecord) -> None:
    print(f"Event from {cr.topic}: headers: {cr.headers}, payload: {cr.value}")

Subscribe to topics by pattern

In the following example the stream will subscribe to any topic that matches the regex ^dev--customer-.*, for example dev--customer-invoice or dev--customer-profile. The subscribe_by_pattern flag must be set to True.

Example

import aiorun
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(
    topics="^dev--customer-.*$",
    subscribe_by_pattern=True,
    group_id="my-group-id",
)
async def stream(cr: ConsumerRecord) -> None:
    if cr.topic == "dev--customer-invoice":
        print("Event from topic dev--customer-invoice"
    elif cr.topic == "dev--customer-profile":
        print("Event from topic dev--customer-profile"
    else:
        raise ValueError(f"Invalid topic {cr.topic}")


async def start():
    await stream_engine.start()

async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(
        start(),
        stop_on_unhandled_errors=True,
        shutdown_callback=shutdown
    )

Dependency Injection

The old way to itereate over a stream is with the async for _ in stream loop. The iterable approach works but in most cases end users are interested only in the ConsumerRecord, for this reason it is possible to remove the async for loop using proper typing hints. The available typing hints are:

  • ConsumerRecord: The aiokafka ConsumerRecord that will be received every time that a new event is in the Stream
  • Stream: The Stream object that is subscribed to the topic/s. Useful when manual commit is enabled or when other Stream operations are needed
  • Send: Coroutine to produce events. The same as stream_engine.send(...)

if you use type hints then every time that a new event is in the stream the coroutine function defined by the end user will ba awaited with the specified types

@stream_engine.stream(topic)
async def my_stream(cr: ConsumerRecord):
    print(cr.value)
@stream_engine.stream(topic, enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream):
    print(cr.value)
    await stream.commit()
@stream_engine.stream(topic, enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream, send: Send):
    print(cr.value)
    await stream.commit()
    await send("sink-to-elastic-topic", value=cr.value)
@stream_engine.stream(topic)
async def consume(stream):  # you can specify the type but it will be the same result
    async for cr in stream:
        print(cr.value)
        # you can do something with the stream as well!!

Note

The type arguments can be in any order. This might change in the future.

Warning

It is still possible to use the async for in loop, but it might be removed in the future. Migrate to the typing approach

Creating a Stream instance

If for any reason you need to create Streams instances directly, you can do it without using the decorator stream_engine.stream.

Stream instance
import aiorun
from kstreams import create_engine, Stream, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


class MyDeserializer:

    async def deserialize(self, consumer_record: ConsumerRecord, **kwargs):
        return consumer_record.value.decode()


async def stream(cr: ConsumerRecord) -> None:
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


stream = Stream(
    "local--kstreams",
    name="my-stream"
    func=stream,  # coroutine or async generator
    deserializer=MyDeserializer(),
)
# add the stream to the engine
stream_engine.add_stream(stream)


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

Removing a stream from the engine

Removing stream
stream_engine.remove_stream(stream)

Starting the stream with initial offsets

If you want to start your consumption from certain offsets, you can include that in your stream instantiation.

Use case: This feature is useful if one wants to manage their own offsets, rather than committing consumed offsets to Kafka. When an application manages its own offsets and tries to start a stream, we start the stream using the initial offsets as defined in the database.

If you try to seek on a partition or topic that is not assigned to your stream, the code will ignore the seek and print out a warning. For example, if you have two consumers that are consuming from different partitions, and you try to seek for all of the partitions on each consumer, each consumer will seek for the partitions it has been assigned, and it will print out a warning log for the ones it was not assigned.

If you try to seek on offsets that are not yet present on your partition, the consumer will revert to the auto_offset_reset config. There will not be a warning, so be aware of this.

Also be aware that when your application restarts, it most likely will trigger the initial_offsets again. This means that setting intial_offsets to be a hardcoded number might not get the results you expect.

Initial Offsets from Database
from kstreams import Stream, structs


topic_name = "local--kstreams"
db_table = ExampleDatabase()
initial_offset = structs.TopicPartitionOffset(topic=topic_name, partition=0, offset=db_table.offset)


async def my_stream(stream: Stream):
    ...


stream = Stream(
    topic_name,
    name="my-stream",
    func=my_stream,  # coroutine or async generator
    deserializer=MyDeserializer(),
    initial_offsets=[initial_offset],
)

Stream crashing

If your stream crashes for any reason the event consumption is stopped, meaning that non event will be consumed from the topic. However, it is possible to set three different error policies per stream:

  • StreamErrorPolicy.STOP (default): Stop the Stream when an exception occurs. The exception is raised after the stream is properly stopped.
  • StreamErrorPolicy.RESTART: Stop and restart the Stream when an exception occurs. The event that caused the exception is skipped. The exception is NOT raised because the application should contine working, however logger.exception() is used to alert the user.
  • StreamErrorPolicy.STOP_ENGINE: Stop the StreamEngine when an exception occurs. The exception is raised after ALL the Streams were properly stopped.
  • StreamErrorPolicy.STOP_APPLICATION: Stop the StreamEngine when an exception occurs and raises signal.SIGTERM. Useful when using kstreams with other libraries such us FastAPI.

In the following example, the StreamErrorPolicy.RESTART error policy is specifed. If the Stream crashed with the ValueError exception it is restarted:

from kstreams import create_engine, ConsumerRecord
from kstreams.stream_utils import StreamErrorPolicy

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(
    "local--hello-world",
    group_id="example-group",
    error_policy=StreamErrorPolicy.RESTART
)
async def stream(cr: ConsumerRecord) -> None:
    if cr.key == b"error":
        # Stream will be restarted after the ValueError is raised
        raise ValueError("error....")

    print(f"Event consumed. Payload {cr.value}")

We can see the logs:

ValueError: error....
INFO:aiokafka.consumer.group_coordinator:LeaveGroup request succeeded
INFO:aiokafka.consumer.consumer:Unsubscribed all topics or patterns and assigned partitions
INFO:kstreams.streams:Stream consuming from topics ['local--hello-world'] has stopped!!! 


INFO:kstreams.middleware.middleware:Restarting stream <kstreams.streams.Stream object at 0x102d44050>
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--hello-world'})
...
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--hello-world', partition=0)} for group example-group

Note

If you are using aiorun with stop_on_unhandled_errors=True and the error_policy is StreamErrorPolicy.RESTART then the application will NOT stop as the exception that caused the Stream to crash is not raised

Changing consumer behavior

Most of the time you will only set the topic and the group_id to the consumer, but sometimes you might want more control over it, for example changing the policy for resetting offsets on OffsetOutOfRange errors or session timeout. To do this, you have to use the same kwargs as the aiokafka consumer API

# The consumer sends periodic heartbeats every 500 ms
# On OffsetOutOfRange errors, the offset will move to the oldest available message (‘earliest’)

@stream_engine.stream("local--kstream", group_id="de-my-partition", session_timeout_ms=500, auto_offset_reset"earliest")
async def stream(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")

Manual commit

When processing more sensitive data and you want to be sure that the kafka offeset is commited once that you have done your tasks, you can use enable_auto_commit=False mode of Consumer.

Manual commit example
@stream_engine.stream("local--kstream", group_id="de-my-partition", enable_auto_commit=False)
async def stream(cr: ConsumerRecord, stream: Stream):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")

    # We need to make sure that the pyalod was stored before commiting the kafka offset
    await store_in_database(payload)
    await stream.commit()  # You need to commit!!!

Note

This is a tradeoff from at most once to at least once delivery, to achieve exactly once you will need to save offsets in the destination database and validate those yourself.

Yield from stream

Sometimes is useful to yield values from a stream so you can consume events in your on phase or because you want to return results to the frontend (SSE example). If you use the yield keyword inside a coroutine it will be "transform" to a asynchronous generator function, meaning that inside there is an async generator and it can be consumed.

Consuming an async generator is simple, you just use the async for in clause. Because consuming events only happens with the for loop, you have to make sure that the Stream has been started properly and after leaving the async for in the stream has been properly stopped.

To facilitate the process, we have context manager that makes sure of the starting/stopping process.

Yield example
# Create your stream
@stream_engine.stream("local--kstream")
async def stream(cr: ConsumerRecord, stream: Stream):
    yield cr.value


# Consume the stream:
async with stream as stream_flow:  # Use the context manager
    async for value in stream_flow:
        ...
        # do something with value (cr.value)

Note

If for some reason you interrupt the "async for in" in the async generator, the Stream will stopped consuming events meaning that the lag will increase.

Note

Yield from a stream only works with the typing approach

Get many

Get a batch of events from the assigned TopicPartition.

Prefetched events are returned in batches by topic-partition. If messages is not available in the prefetched buffer this method waits timeout_ms milliseconds.

Attributes:

Name Type Description
partitions List[TopicPartition] | None

The partitions that need fetching message. If no one partition specified then all subscribed partitions will be used

timeout_ms int | None

milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.

max_records int | None

The amount of records to fetch. if timeout_ms was defined and reached and the fetched records has not reach max_records then returns immediately with any records that are available currently in the buffer

Returns:

Type Description
Dict[TopicPartition, List[ConsumerRecord]]

Topic to list of records

Example

@stream_engine.stream(topic, ...)
async def stream(stream: Stream):
    while True:
        data = await stream.getmany(max_records=5)
        print(data)
Source code in kstreams/streams.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
async def getmany(
    self,
    partitions: typing.Optional[typing.List[TopicPartition]] = None,
    timeout_ms: int = 0,
    max_records: typing.Optional[int] = None,
) -> typing.Dict[TopicPartition, typing.List[ConsumerRecord]]:
    """
    Get a batch of events from the assigned TopicPartition.

    Prefetched events are returned in batches by topic-partition.
    If messages is not available in the prefetched buffer this method waits
    `timeout_ms` milliseconds.

    Attributes:
        partitions List[TopicPartition] | None: The partitions that need
            fetching message. If no one partition specified then all
            subscribed partitions will be used
        timeout_ms int | None: milliseconds spent waiting if
            data is not available in the buffer. If 0, returns immediately
            with any records that are available currently in the buffer,
            else returns empty. Must not be negative.
        max_records int | None: The amount of records to fetch.
            if `timeout_ms` was defined and reached and the fetched records
            has not reach `max_records` then returns immediately
            with any records that are available currently in the buffer

    Returns:
        Topic to list of records

    !!! Example
        ```python
        @stream_engine.stream(topic, ...)
        async def stream(stream: Stream):
            while True:
                data = await stream.getmany(max_records=5)
                print(data)
        ```
    """
    partitions = partitions or []
    return await self.consumer.getmany(  # type: ignore
        *partitions, timeout_ms=timeout_ms, max_records=max_records
    )

Warning

This approach does not works with Dependency Injection.

Rebalance Listener

For some cases you will need a RebalanceListener so when partitions are assigned or revoked to the stream different accions can be performed.

Use cases

  • Cleanup or custom state save on the start of a rebalance operation
  • Saving offsets in a custom store when a partition is revoked
  • Load a state or cache warmup on completion of a successful partition re-assignment.

Metrics Rebalance Listener

Kstreams use a default listener for all the streams to clean the metrics after a rebalance takes place

kstreams.MetricsRebalanceListener

Source code in kstreams/rebalance_listener.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class MetricsRebalanceListener(RebalanceListener):
    async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
        """
        Coroutine to be called *before* a rebalance operation starts and
        *after* the consumer stops fetching data.

        This will method will clean up the `Prometheus` metrics

        Attributes:
            revoked Set[TopicPartitions]: Partitions that were assigned
                to the consumer on the last rebalance
        """
        # lock all asyncio Tasks so no new metrics will be added to the Monitor
        if revoked and self.engine is not None:
            async with asyncio.Lock():
                if self.stream is not None and self.stream.consumer is not None:
                    self.engine.monitor.clean_stream_consumer_metrics(
                        self.stream.consumer
                    )

    async def on_partitions_assigned(
        self, assigned: typing.Set[TopicPartition]
    ) -> None:
        """
        Coroutine to be called *after* partition re-assignment completes
        and *before* the consumer starts fetching data again.

        This method will start the `Prometheus` metrics

        Attributes:
            assigned Set[TopicPartition]: Partitions assigned to the
                consumer (may include partitions that were previously assigned)
        """
        # lock all asyncio Tasks so no new metrics will be added to the Monitor
        if assigned and self.engine is not None:
            async with asyncio.Lock():
                if self.stream is not None:
                    self.stream.seek_to_initial_offsets()

on_partitions_assigned(assigned) async

Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.

This method will start the Prometheus metrics

Attributes:

Name Type Description
assigned Set[TopicPartition]

Partitions assigned to the consumer (may include partitions that were previously assigned)

Source code in kstreams/rebalance_listener.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def on_partitions_assigned(
    self, assigned: typing.Set[TopicPartition]
) -> None:
    """
    Coroutine to be called *after* partition re-assignment completes
    and *before* the consumer starts fetching data again.

    This method will start the `Prometheus` metrics

    Attributes:
        assigned Set[TopicPartition]: Partitions assigned to the
            consumer (may include partitions that were previously assigned)
    """
    # lock all asyncio Tasks so no new metrics will be added to the Monitor
    if assigned and self.engine is not None:
        async with asyncio.Lock():
            if self.stream is not None:
                self.stream.seek_to_initial_offsets()

on_partitions_revoked(revoked) async

Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.

This will method will clean up the Prometheus metrics

Attributes:

Name Type Description
revoked Set[TopicPartitions]

Partitions that were assigned to the consumer on the last rebalance

Source code in kstreams/rebalance_listener.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
    """
    Coroutine to be called *before* a rebalance operation starts and
    *after* the consumer stops fetching data.

    This will method will clean up the `Prometheus` metrics

    Attributes:
        revoked Set[TopicPartitions]: Partitions that were assigned
            to the consumer on the last rebalance
    """
    # lock all asyncio Tasks so no new metrics will be added to the Monitor
    if revoked and self.engine is not None:
        async with asyncio.Lock():
            if self.stream is not None and self.stream.consumer is not None:
                self.engine.monitor.clean_stream_consumer_metrics(
                    self.stream.consumer
                )

Manual Commit

If manual commit is enabled, you migh want to use the ManualCommitRebalanceListener. This rebalance listener will call commit before the stream partitions are revoked to avoid the error CommitFailedError and duplicate message delivery after a rebalance. See code example with manual commit

kstreams.ManualCommitRebalanceListener

Source code in kstreams/rebalance_listener.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class ManualCommitRebalanceListener(MetricsRebalanceListener):
    async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
        """
        Coroutine to be called *before* a rebalance operation starts and
        *after* the consumer stops fetching data.

        If manual commit is enabled, `commit` is called before the consumers
        partitions are revoked to prevent the error `CommitFailedError`
        and duplicate message delivery after a rebalance.

        Attributes:
            revoked Set[TopicPartitions]: Partitions that were assigned
                to the consumer on the last rebalance
        """
        if (
            revoked
            and self.stream is not None
            and self.stream.consumer is not None
            and not self.stream.consumer._enable_auto_commit
        ):
            logger.info(
                f"Manual commit enabled for stream {self.stream}. "
                "Performing `commit` before revoking partitions"
            )
            async with asyncio.Lock():
                await self.stream.commit()

            await super().on_partitions_revoked(revoked=revoked)

on_partitions_revoked(revoked) async

Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.

If manual commit is enabled, commit is called before the consumers partitions are revoked to prevent the error CommitFailedError and duplicate message delivery after a rebalance.

Attributes:

Name Type Description
revoked Set[TopicPartitions]

Partitions that were assigned to the consumer on the last rebalance

Source code in kstreams/rebalance_listener.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
    """
    Coroutine to be called *before* a rebalance operation starts and
    *after* the consumer stops fetching data.

    If manual commit is enabled, `commit` is called before the consumers
    partitions are revoked to prevent the error `CommitFailedError`
    and duplicate message delivery after a rebalance.

    Attributes:
        revoked Set[TopicPartitions]: Partitions that were assigned
            to the consumer on the last rebalance
    """
    if (
        revoked
        and self.stream is not None
        and self.stream.consumer is not None
        and not self.stream.consumer._enable_auto_commit
    ):
        logger.info(
            f"Manual commit enabled for stream {self.stream}. "
            "Performing `commit` before revoking partitions"
        )
        async with asyncio.Lock():
            await self.stream.commit()

        await super().on_partitions_revoked(revoked=revoked)

Note

ManualCommitRebalanceListener also includes the MetricsRebalanceListener funcionality.

Custom Rebalance Listener

If you want to define a custom RebalanceListener, it has to inherits from kstreams.RebalanceListener.

kstreams.RebalanceListener

A callback interface that the user can implement to trigger custom actions when the set of partitions are assigned or revoked to the Stream.

Example

from kstreams import RebalanceListener, TopicPartition
from .resource import stream_engine


class MyRebalanceListener(RebalanceListener):

    async def on_partitions_revoked(
        self, revoked: Set[TopicPartition]
    ) -> None:
        # Do something with the revoked partitions
        # or with the Stream
        print(self.stream)

    async def on_partitions_assigned(
        self, assigned: Set[TopicPartition]
    ) -> None:
        # Do something with the assigned partitions
        # or with the Stream
        print(self.stream)


@stream_engine.stream(topic, rebalance_listener=MyRebalanceListener())
async def my_stream(stream: Stream):
    async for event in stream:
        ...
Source code in kstreams/rebalance_listener.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class RebalanceListener(ConsumerRebalanceListener):
    """
    A callback interface that the user can implement to trigger custom actions
    when the set of partitions are assigned or revoked to the `Stream`.

    !!! Example
        ```python
        from kstreams import RebalanceListener, TopicPartition
        from .resource import stream_engine


        class MyRebalanceListener(RebalanceListener):

            async def on_partitions_revoked(
                self, revoked: Set[TopicPartition]
            ) -> None:
                # Do something with the revoked partitions
                # or with the Stream
                print(self.stream)

            async def on_partitions_assigned(
                self, assigned: Set[TopicPartition]
            ) -> None:
                # Do something with the assigned partitions
                # or with the Stream
                print(self.stream)


        @stream_engine.stream(topic, rebalance_listener=MyRebalanceListener())
        async def my_stream(stream: Stream):
            async for event in stream:
                ...
        ```
    """

    def __init__(self) -> None:
        self.stream: typing.Optional["Stream"] = None
        # engine added so it can react on rebalance events
        self.engine: typing.Optional["StreamEngine"] = None

    async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
        """
        Coroutine to be called *before* a rebalance operation starts and
        *after* the consumer stops fetching data.

        If you are using manual commit you have to commit all consumed offsets
        here, to avoid duplicate message delivery after rebalance is finished.

        Use cases:
            - cleanup or custom state save on the start of a rebalance operation
            - saving offsets in a custom store

        Attributes:
            revoked Set[TopicPartitions]: Partitions that were assigned
                to the consumer on the last rebalance

        !!! note
            The `Stream` is available using `self.stream`
        """
        ...  # pragma: no cover

    async def on_partitions_assigned(
        self, assigned: typing.Set[TopicPartition]
    ) -> None:
        """
        Coroutine to be called *after* partition re-assignment completes
        and *before* the consumer starts fetching data again.

        It is guaranteed that all the processes in a consumer group will
        execute their `on_partitions_revoked` callback before any instance
        executes its `on_partitions_assigned` callback.

        Use cases:
            - Load a state or cache warmup on completion of a successful
            partition re-assignment.

        Attributes:
            assigned Set[TopicPartition]: Partitions assigned to the
                consumer (may include partitions that were previously assigned)

        !!! note
            The `Stream` is available using `self.stream`
        """
        ...  # pragma: no cover

on_partitions_assigned(assigned) async

Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.

It is guaranteed that all the processes in a consumer group will execute their on_partitions_revoked callback before any instance executes its on_partitions_assigned callback.

Use cases
  • Load a state or cache warmup on completion of a successful partition re-assignment.

Attributes:

Name Type Description
assigned Set[TopicPartition]

Partitions assigned to the consumer (may include partitions that were previously assigned)

Note

The Stream is available using self.stream

Source code in kstreams/rebalance_listener.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def on_partitions_assigned(
    self, assigned: typing.Set[TopicPartition]
) -> None:
    """
    Coroutine to be called *after* partition re-assignment completes
    and *before* the consumer starts fetching data again.

    It is guaranteed that all the processes in a consumer group will
    execute their `on_partitions_revoked` callback before any instance
    executes its `on_partitions_assigned` callback.

    Use cases:
        - Load a state or cache warmup on completion of a successful
        partition re-assignment.

    Attributes:
        assigned Set[TopicPartition]: Partitions assigned to the
            consumer (may include partitions that were previously assigned)

    !!! note
        The `Stream` is available using `self.stream`
    """
    ...  # pragma: no cover

on_partitions_revoked(revoked) async

Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.

If you are using manual commit you have to commit all consumed offsets here, to avoid duplicate message delivery after rebalance is finished.

Use cases
  • cleanup or custom state save on the start of a rebalance operation
  • saving offsets in a custom store

Attributes:

Name Type Description
revoked Set[TopicPartitions]

Partitions that were assigned to the consumer on the last rebalance

Note

The Stream is available using self.stream

Source code in kstreams/rebalance_listener.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
    """
    Coroutine to be called *before* a rebalance operation starts and
    *after* the consumer stops fetching data.

    If you are using manual commit you have to commit all consumed offsets
    here, to avoid duplicate message delivery after rebalance is finished.

    Use cases:
        - cleanup or custom state save on the start of a rebalance operation
        - saving offsets in a custom store

    Attributes:
        revoked Set[TopicPartitions]: Partitions that were assigned
            to the consumer on the last rebalance

    !!! note
        The `Stream` is available using `self.stream`
    """
    ...  # pragma: no cover

Note

It also possible to inherits from ManualCommitRebalanceListener and MetricsRebalanceListener