Skip to content

Test Client

To test your streams or perform e2e tests you can make use of the test_utils.TestStreamClient. The TestStreamClient you can send events so you won't need a producer

Let's assume that you have the following code example:

# streams.py
from kstreams import ConsumerRecord

from .engine import stream_engine


topic = "dev-kpn-des--kstreams"

def save_to_db(value):
    # Store the value in your Database
    ...


@stream_engine.stream(topic, group_id="example-group")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
    save_to_db(value)

In order to test it, you could mock the kafka Consumer and Producer or use the TestStreamClient

# test_stream.py
import pytest
from django_streams.test_utils import TestStreamClient

from .engine import stream_engine


@pytest.mark.asyncio
async def test_streams_consume_events():
    topic = "dev-kpn-des--kstreams"  # Use the same topic as the stream
    event = b'{"message": "Hello world!"}'

    with patch("example.on_consume") as save_to_db:
        async with TestStreamClient(stream_engine=stream_engine) as test_client:
            metadata = await test_client.send(topic, value=event, key="1")  # send the event with the test client
            current_offset = metadata.offset
            assert metadata.topic == topic

            # send another event and check that the offset was incremented
            metadata = await test_client.send(topic, value=b'{"message": "Hello world!"}', key="1")
            assert metadata.offset == current_offset + 1

    # check that the event was consumed
    assert save_to_db.called

Sync Producer only

In some scenarios, your application will only produce events in a synchronous way and other application/s will consume it, but you want to make sure that the event was procuced in a proper way and the topic contains that event.

# producer_example.py
from .engine import stream_engine


topic = "dev-kpn-des--hello-world"


def produce(event):
    # function to produce events. This function can be
    # a django view where an event is produced
    stream_engine.sync_send(topic, value=event, key="1")

Then you could have a test_producer_example.py file to test the code:

from producer_example import topic, produce


@pytest.mark.asyncio
async def test_e2e_with_sync_producer_no_stream(stream_engine: StreamEngine):
    event = b'{"message": "Hello world!"}'

    async with TestStreamClient(stream_engine=stream_engine) as client:
        # produce 2 events
        produce(event)
        produce(event)

        # check that the event was placed in a topic in a proper way
        consumer_record = await client.get_event(topic_name=topic)
        assert consumer_record.value == event

        # check that the event was placed in a topic in a proper way
        consumer_record = await client.get_event(topic_name=topic)
        assert consumer_record.value == event