Skip to content

Kstreams

kstreams is a library/micro framework to use with kafka. It has simple kafka streams implementation that gives certain guarantees, see below.

Build status codecov python version

Requirements

python 3.8+

Installation

pip install kstreams

You will need a worker, we recommend aiorun

pip install aiorun

Usage

import aiorun
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await stream_engine.send("local--kstreams", value=payload)
        print(f"Message sent: {metadata}")


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

Kafka configuration

Configure kafka using the kafka backend provided.

Development

This repo requires the use of poetry instead of pip. Note: If you want to have the virtualenv in the same path as the project first you should run poetry config --local virtualenvs.in-project true

To install the dependencies just execute:

poetry install

Then you can activate the virtualenv with

poetry shell

Run test:

./scripts/test

Run code linting (black and isort)

./scripts/lint

Commit messages

The use of commitizen is recommended. Commitizen is part of the dev dependencies.

cz commit