Django-streams
Django application to produce/consume events from Kafka supported by kstreams
Installation
pip install django-streams
or with poetry
poetry add django-streams
and add it to INSTALLED_APPS
:
INSTALLED_APPS = [
...
"django_streams",
...
"my_streams_app",
# etc...
]
Documentation
https://kpn.github.io/django-streams/
Usage
create the engine
:
# my_streams_app/engine.py
from django_streams import create_engine
from kstreams.backends import Kafka
stream_engine = create_engine(
title="test-engine",
backend=Kafka(),
)
Note
To configure the backend follow the kstreams backend documentation
Consuming events
Define your streams:
# my_streams_app/streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine
@stream_engine.stream("dev-kpn-des--hello-kpn", group_id="django-streams-principal-group-id") # your consumer
async def consumer_task(cr: ConsumerRecord):
async for cr in stream:
logger.info(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
and then in your apps.py
you must import the python module
or your coroutines
# my_streams_app/apps.py
from django.apps import AppConfig
class StreamingAppConfig(AppConfig):
name = "streaming_app"
def ready(self):
from . import streams # import the streams module
Now you can run the worker:
python manage.py worker
Producing events
Producing events can be sync
or async
. If you are in a sync
context you must use stream_engine.sync_send
, otherwise stream_engine.send. For both cases a RecordMetadata
is returned.
# streaming_app/views.py
from django.http import HttpResponse
from django.views.generic import View
from .engine import stream_engine
class HelloWorldView(View):
def get(self, request, *args, **kwargs):
record_metadata = stream_engine.sync_send(
"hello-kpn",
value=b"hello world",
key="hello",
partition=None,
timestamp_ms=None,
headers=None,
)
return HttpResponse(f"Event metadata: {record_metadata}")
Benchmark
Producer:
Total produced events | Time (seconds) |
---|---|
1 | 0.004278898239135742 |
10 | 0.030963897705078125 |
100 | 0.07049298286437988 |
1000 | 0.6609988212585449 |
10000 | 6.501222133636475 |
Running tests
./scrtips/test
Code formating
./scrtips/format