Streams
A Stream
in kstreams
is an extension of AIOKafkaConsumer
Consuming can be done using kstreams.Stream
. You only need to decorate a coroutine
with @stream_engine.streams
. The decorator has the same aiokafka consumer API at initialization, in other words they accept the same args
and kwargs
that the aiokafka consumer
accepts.
kstreams.streams.Stream
Attributes:
Name | Type | Description |
---|---|---|
name |
Optional[str]
|
Stream name. Default is a generated uuid4 |
topics |
List[str]
|
List of topics to consume |
subscribe_by_pattern |
bool
|
Whether subscribe to topics by pattern |
backend |
Kafka
|
backend kstreams.backends.kafka.Kafka:
Backend to connect. Default |
func |
Callable[[Stream], Awaitable[Any]]
|
Coroutine fucntion or generator to be called when an event arrives |
config |
Dict[str, Any]
|
Stream configuration. Here all the properties can be passed in the dictionary |
deserializer |
Deserializer
|
Deserializer to be used when an event is consumed |
initial_offsets |
List[TopicPartitionOffset]
|
List of
TopicPartitionOffset that will |
rebalance_listener |
RebalanceListener
|
Listener callbacks when partition are assigned or revoked |
Subscribe to a topic
Example
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--kstreams", group_id="my-group-id")
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def start():
await stream_engine.start()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(
start(),
stop_on_unhandled_errors=True,
shutdown_callback=shutdown
)
Subscribe to multiple topics
Consuming from multiple topics using one stream
is possible. A List[str]
of topics must be provided.
Example
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream(
["local--kstreams", "local--hello-world"],
group_id="my-group-id",
)
async def consume(cr: ConsumerRecord) -> None:
print(f"Event from {cr.topic}: headers: {cr.headers}, payload: {cr.value}")
Subscribe to topics by pattern
In the following example the stream will subscribe to any topic that matches
the regex ^dev--customer-.*
, for example dev--customer-invoice
or
dev--customer-profile
. The subscribe_by_pattern
flag must be set to True
.
Example
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream(
topics="^dev--customer-.*$",
subscribe_by_pattern=True,
group_id="my-group-id",
)
async def stream(cr: ConsumerRecord) -> None:
if cr.topic == "dev--customer-invoice":
print("Event from topic dev--customer-invoice"
elif cr.topic == "dev--customer-profile":
print("Event from topic dev--customer-profile"
else:
raise ValueError(f"Invalid topic {cr.topic}")
async def start():
await stream_engine.start()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(
start(),
stop_on_unhandled_errors=True,
shutdown_callback=shutdown
)
Dependency Injection
The old way to itereate over a stream is with the async for _ in stream
loop. The iterable approach works but in most cases end users are interested only in the ConsumerRecord
,
for this reason it is possible to remove the async for loop
using proper typing hints
. The available typing hints
are:
ConsumerRecord
: Theaiokafka
ConsumerRecord that will be received every time that a new event is in theStream
Stream
: TheStream
object that is subscribed to the topic/s. Useful whenmanual
commit is enabled or when otherStream
operations are neededSend
: Coroutine to produce events. The same asstream_engine.send(...)
if you use type hints
then every time that a new event is in the stream the coroutine
function defined by the end user will ba awaited
with the specified types
@stream_engine.stream(topic)
async def my_stream(cr: ConsumerRecord):
print(cr.value)
@stream_engine.stream(topic, enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream):
print(cr.value)
await stream.commit()
@stream_engine.stream(topic, enable_auto_commit=False)
async def my_stream(cr: ConsumerRecord, stream: Stream, send: Send):
print(cr.value)
await stream.commit()
await send("sink-to-elastic-topic", value=cr.value)
@stream_engine.stream(topic)
async def consume(stream): # you can specify the type but it will be the same result
async for cr in stream:
print(cr.value)
# you can do something with the stream as well!!
Note
The type arguments can be in any
order. This might change in the future.
Warning
It is still possible to use the async for in
loop, but it might be removed in the future. Migrate to the typing approach
Creating a Stream instance
If for any reason you need to create Streams
instances directly, you can do it without using the decorator stream_engine.stream
.
import aiorun
from kstreams import create_engine, Stream, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
class MyDeserializer:
async def deserialize(self, consumer_record: ConsumerRecord, **kwargs):
return consumer_record.value.decode()
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
stream = Stream(
"local--kstreams",
name="my-stream"
func=stream, # coroutine or async generator
deserializer=MyDeserializer(),
)
# add the stream to the engine
stream_engine.add_stream(stream)
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
Removing a stream from the engine
stream_engine.remove_stream(stream)
Starting the stream with initial offsets
If you want to start your consumption from certain offsets, you can include that in your stream instantiation.
Use case: This feature is useful if one wants to manage their own offsets, rather than committing consumed offsets to Kafka. When an application manages its own offsets and tries to start a stream, we start the stream using the initial offsets as defined in the database.
If you try to seek on a partition or topic that is not assigned to your stream, the code will ignore the seek and print out a warning. For example, if you have two consumers that are consuming from different partitions, and you try to seek for all of the partitions on each consumer, each consumer will seek for the partitions it has been assigned, and it will print out a warning log for the ones it was not assigned.
If you try to seek on offsets that are not yet present on your partition, the consumer will revert to the auto_offset_reset config. There will not be a warning, so be aware of this.
Also be aware that when your application restarts, it most likely will trigger the initial_offsets again. This means that setting intial_offsets to be a hardcoded number might not get the results you expect.
from kstreams import Stream, structs
topic_name = "local--kstreams"
db_table = ExampleDatabase()
initial_offset = structs.TopicPartitionOffset(topic=topic_name, partition=0, offset=db_table.offset)
async def my_stream(stream: Stream):
...
stream = Stream(
topic_name,
name="my-stream",
func=my_stream, # coroutine or async generator
deserializer=MyDeserializer(),
initial_offsets=[initial_offset],
)
Stream crashing
If your stream crashes
for any reason the event consumption is stopped, meaning that non event will be consumed from the topic
. However, it is possible to set three different error policies
per stream:
StreamErrorPolicy.STOP
(default): Stop theStream
when an exception occurs. The exception is raised after the stream is properly stopped.StreamErrorPolicy.RESTART
: Stop and restart theStream
when an exception occurs. The event that caused the exception is skipped. The exception is NOT raised because the application should contine working, howeverlogger.exception()
is used to alert the user.StreamErrorPolicy.STOP_ENGINE
: Stop theStreamEngine
when an exception occurs. The exception is raised after ALL the Streams were properly stopped.StreamErrorPolicy.STOP_APPLICATION
: Stop theStreamEngine
when an exception occurs and raisessignal.SIGTERM
. Useful when usingkstreams
with other libraries such usFastAPI
.
In the following example, the StreamErrorPolicy.RESTART
error policy is specifed. If the Stream
crashed with the ValueError
exception it is restarted:
from kstreams import create_engine, ConsumerRecord
from kstreams.stream_utils import StreamErrorPolicy
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream(
"local--hello-world",
group_id="example-group",
error_policy=StreamErrorPolicy.RESTART
)
async def stream(cr: ConsumerRecord) -> None:
if cr.key == b"error":
# Stream will be restarted after the ValueError is raised
raise ValueError("error....")
print(f"Event consumed. Payload {cr.value}")
We can see the logs:
ValueError: error....
INFO:aiokafka.consumer.group_coordinator:LeaveGroup request succeeded
INFO:aiokafka.consumer.consumer:Unsubscribed all topics or patterns and assigned partitions
INFO:kstreams.streams:Stream consuming from topics ['local--hello-world'] has stopped!!!
INFO:kstreams.middleware.middleware:Restarting stream <kstreams.streams.Stream object at 0x102d44050>
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--hello-world'})
...
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--hello-world', partition=0)} for group example-group
Note
If you are using aiorun
with stop_on_unhandled_errors=True
and the error_policy
is StreamErrorPolicy.RESTART
then the application
will NOT stop as the exception that caused the Stream
to crash
is not raised
Changing consumer behavior
Most of the time you will only set the topic
and the group_id
to the consumer
, but sometimes you might want more control over it, for example changing the policy for resetting offsets on OffsetOutOfRange errors
or session timeout
. To do this, you have to use the same kwargs
as the aiokafka consumer API
# The consumer sends periodic heartbeats every 500 ms
# On OffsetOutOfRange errors, the offset will move to the oldest available message (‘earliest’)
@stream_engine.stream("local--kstream", group_id="de-my-partition", session_timeout_ms=500, auto_offset_reset"earliest")
async def stream(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
Manual commit
When processing more sensitive data and you want to be sure that the kafka offeset
is commited once that you have done your tasks, you can use enable_auto_commit=False
mode of Consumer.
@stream_engine.stream("local--kstream", group_id="de-my-partition", enable_auto_commit=False)
async def stream(cr: ConsumerRecord, stream: Stream):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
# We need to make sure that the pyalod was stored before commiting the kafka offset
await store_in_database(payload)
await stream.commit() # You need to commit!!!
Note
This is a tradeoff from at most once to at least once delivery, to achieve exactly once you will need to save offsets in the destination database and validate those yourself.
Yield from stream
Sometimes is useful to yield
values from a stream
so you can consume events in your on phase or because you want to return results to the frontend (SSE example).
If you use the yield
keyword inside a coroutine
it will be "transform" to a asynchronous generator function
, meaning that inside there is an async generator
and it can be consumed.
Consuming an async generator
is simple, you just use the async for in
clause. Because consuming events only happens with the for loop
, you have to make sure that the Stream
has been started properly and after leaving the async for in
the stream
has been properly stopped.
To facilitate the process, we have context manager
that makes sure of the starting/stopping
process.
# Create your stream
@stream_engine.stream("local--kstream")
async def stream(cr: ConsumerRecord, stream: Stream):
yield cr.value
# Consume the stream:
async with stream as stream_flow: # Use the context manager
async for value in stream_flow:
...
# do something with value (cr.value)
Note
If for some reason you interrupt the "async for in" in the async generator, the Stream will stopped consuming events meaning that the lag will increase.
Note
Yield from a stream only works with the typing approach
Get many
Get a batch of events from the assigned TopicPartition.
Prefetched events are returned in batches by topic-partition.
If messages is not available in the prefetched buffer this method waits
timeout_ms
milliseconds.
Attributes:
Name | Type | Description |
---|---|---|
partitions |
List[TopicPartition] | None
|
The partitions that need fetching message. If no one partition specified then all subscribed partitions will be used |
timeout_ms |
int | None
|
milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative. |
max_records |
int | None
|
The amount of records to fetch.
if |
Returns:
Type | Description |
---|---|
Dict[TopicPartition, List[ConsumerRecord]]
|
Topic to list of records |
Example
@stream_engine.stream(topic, ...)
async def stream(stream: Stream):
while True:
data = await stream.getmany(max_records=5)
print(data)
Source code in kstreams/streams.py
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
|
Warning
This approach does not works with Dependency Injection
.
Rebalance Listener
For some cases you will need a RebalanceListener
so when partitions are assigned
or revoked
to the stream different accions can be performed.
Use cases
- Cleanup or custom state save on the start of a rebalance operation
- Saving offsets in a custom store when a partition is
revoked
- Load a state or cache warmup on completion of a successful partition re-assignment.
Metrics Rebalance Listener
Kstreams use a default listener for all the streams to clean the metrics after a rebalance takes place
kstreams.MetricsRebalanceListener
Source code in kstreams/rebalance_listener.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
|
on_partitions_assigned(assigned)
async
Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.
This method will start the Prometheus
metrics
Attributes:
Name | Type | Description |
---|---|---|
assigned |
Set[TopicPartition]
|
Partitions assigned to the consumer (may include partitions that were previously assigned) |
Source code in kstreams/rebalance_listener.py
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
|
on_partitions_revoked(revoked)
async
Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.
This will method will clean up the Prometheus
metrics
Attributes:
Name | Type | Description |
---|---|---|
revoked |
Set[TopicPartitions]
|
Partitions that were assigned to the consumer on the last rebalance |
Source code in kstreams/rebalance_listener.py
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|
Manual Commit
If manual
commit is enabled, you migh want to use the ManualCommitRebalanceListener
. This rebalance listener
will call commit
before the stream
partitions are revoked to avoid the error CommitFailedError
and duplicate message delivery after a rebalance. See code example with
manual commit
kstreams.ManualCommitRebalanceListener
Source code in kstreams/rebalance_listener.py
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
|
on_partitions_revoked(revoked)
async
Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.
If manual commit is enabled, commit
is called before the consumers
partitions are revoked to prevent the error CommitFailedError
and duplicate message delivery after a rebalance.
Attributes:
Name | Type | Description |
---|---|---|
revoked |
Set[TopicPartitions]
|
Partitions that were assigned to the consumer on the last rebalance |
Source code in kstreams/rebalance_listener.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
|
Note
ManualCommitRebalanceListener
also includes the MetricsRebalanceListener
funcionality.
Custom Rebalance Listener
If you want to define a custom RebalanceListener
, it has to inherits from kstreams.RebalanceListener
.
kstreams.RebalanceListener
A callback interface that the user can implement to trigger custom actions
when the set of partitions are assigned or revoked to the Stream
.
Example
from kstreams import RebalanceListener, TopicPartition
from .resource import stream_engine
class MyRebalanceListener(RebalanceListener):
async def on_partitions_revoked(
self, revoked: Set[TopicPartition]
) -> None:
# Do something with the revoked partitions
# or with the Stream
print(self.stream)
async def on_partitions_assigned(
self, assigned: Set[TopicPartition]
) -> None:
# Do something with the assigned partitions
# or with the Stream
print(self.stream)
@stream_engine.stream(topic, rebalance_listener=MyRebalanceListener())
async def my_stream(stream: Stream):
async for event in stream:
...
Source code in kstreams/rebalance_listener.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
|
on_partitions_assigned(assigned)
async
Coroutine to be called after partition re-assignment completes and before the consumer starts fetching data again.
It is guaranteed that all the processes in a consumer group will
execute their on_partitions_revoked
callback before any instance
executes its on_partitions_assigned
callback.
Use cases
- Load a state or cache warmup on completion of a successful partition re-assignment.
Attributes:
Name | Type | Description |
---|---|---|
assigned |
Set[TopicPartition]
|
Partitions assigned to the consumer (may include partitions that were previously assigned) |
Note
The Stream
is available using self.stream
Source code in kstreams/rebalance_listener.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
|
on_partitions_revoked(revoked)
async
Coroutine to be called before a rebalance operation starts and after the consumer stops fetching data.
If you are using manual commit you have to commit all consumed offsets here, to avoid duplicate message delivery after rebalance is finished.
Use cases
- cleanup or custom state save on the start of a rebalance operation
- saving offsets in a custom store
Attributes:
Name | Type | Description |
---|---|---|
revoked |
Set[TopicPartitions]
|
Partitions that were assigned to the consumer on the last rebalance |
Note
The Stream
is available using self.stream
Source code in kstreams/rebalance_listener.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
|
Note
It also possible to inherits from ManualCommitRebalanceListener
and MetricsRebalanceListener