Django-streams
Django application to produce/consume events from Kafka supported by kstreams

Installation
pip install django-streams
or with poetry
poetry add django-streams
and add it to INSTALLED_APPS:
INSTALLED_APPS = [
    ...
    "django_streams",
    ...
    "my_streams_app",
    # etc...
]
Documentation
https://kpn.github.io/django-streams/
Usage
create the engine:
# my_streams_app/engine.py
from django_streams import create_engine
from kstreams.backends import Kafka
stream_engine = create_engine(
    title="test-engine",
    backend=Kafka(),
)
Note
To configure the backend follow the kstreams backend documentation
Consuming events
Define your streams:
# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine
@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id")  # your consumer
async def consumer_task(cr: ConsumerRecord):
    async for cr in stream:
        logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
and then in your apps.py you must import the python module or your coroutines
# my_streams_app/apps.py
from django.apps import AppConfig
class StreamingAppConfig(AppConfig):
    name = "streaming_app"
    def ready(self):
        from . import streams  # import the streams module
Now you can run the worker:
python manage.py worker
Producing events
Producing events can be sync or async. If you are in a sync context you must use stream_engine.sync_send, otherwise stream_engine.send. For both cases a RecordMetadata is returned.
# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View
from .engine import stream_engine
class HelloWorldView(View):
    def get(self, request, *args, **kwargs):
        record_metadata = stream_engine.sync_send(
            "hello-kpn",
            value=b"hello world",
            key="hello",
            partition=None,
            timestamp_ms=None,
            headers=None,
        )
        return HttpResponse(f"Event metadata: {record_metadata}")
Benchmark
Producer:
| Total produced events | Time (seconds) | 
|---|---|
| 1 | 0.004278898239135742 | 
| 10 | 0.030963897705078125 | 
| 100 | 0.07049298286437988 | 
| 1000 | 0.6609988212585449 | 
| 10000 | 6.501222133636475 | 
Running tests
./scrtips/test
Code formating
./scrtips/format