Test Client
To test your streams
or perform e2e
tests you can make use of the test_utils.TestStreamClient
.
The TestStreamClient
you can send events so you won't need a producer
Let's assume that you have the following code example:
# streams.py
from kstreams import ConsumerRecord
from .engine import stream_engine
topic = "dev-kpn-des--kstreams"
def save_to_db(value):
# Store the value in your Database
...
@stream_engine.stream(topic, group_id="example-group")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
save_to_db(value)
In order to test it, you could mock the kafka Consumer
and Producer
or use the TestStreamClient
# test_stream.py
import pytest
from django_streams.test_utils import TestStreamClient
from .engine import stream_engine
@pytest.mark.asyncio
async def test_streams_consume_events():
topic = "dev-kpn-des--kstreams" # Use the same topic as the stream
event = b'{"message": "Hello world!"}'
with patch("example.on_consume") as save_to_db:
async with TestStreamClient(stream_engine=stream_engine) as test_client:
metadata = await test_client.send(topic, value=event, key="1") # send the event with the test client
current_offset = metadata.offset
assert metadata.topic == topic
# send another event and check that the offset was incremented
metadata = await test_client.send(topic, value=b'{"message": "Hello world!"}', key="1")
assert metadata.offset == current_offset + 1
# check that the event was consumed
assert save_to_db.called
Sync Producer only
In some scenarios, your application will only produce events in a synchronous
way and other application/s will consume it, but you want to make sure that
the event was procuced in a proper way and the topic
contains that event
.
# producer_example.py
from .engine import stream_engine
topic = "dev-kpn-des--hello-world"
def produce(event):
# function to produce events. This function can be
# a django view where an event is produced
stream_engine.sync_send(topic, value=event, key="1")
Then you could have a test_producer_example.py file to test the code:
from producer_example import topic, produce
@pytest.mark.asyncio
async def test_e2e_with_sync_producer_no_stream(stream_engine: StreamEngine):
event = b'{"message": "Hello world!"}'
async with TestStreamClient(stream_engine=stream_engine) as client:
# produce 2 events
produce(event)
produce(event)
# check that the event was placed in a topic in a proper way
consumer_record = await client.get_event(topic_name=topic)
assert consumer_record.value == event
# check that the event was placed in a topic in a proper way
consumer_record = await client.get_event(topic_name=topic)
assert consumer_record.value == event