Skip to content

Getting Started

You can starting using kstreams with simple producers and consumers and/or integrated it with any async framework like FastAPI

Simple consumer and producer

Simple use case
import asyncio
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await stream_engine.send("local--py-streams", value=payload, key="1")
        print(f"Message sent: {metadata}")
        await asyncio.sleep(5)


async def start():
    await stream_engine.start()
    await produce()


async def shutdown():
    await stream_engine.stop()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(start())
        loop.run_forever()
    finally:
        loop.run_until_complete(shutdown())
        loop.close()

(This script is complete, it should run "as is")

In the previous example you can see some boiler plate regarding how to start the program. We recommend to use aiorun, so you want have to worry about set signal handlers, shutdown callbacks, graceful shutdown and close the event loop.

Usage with aiorun
import aiorun
from kstreams import create_engine, ConsumerRecord

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await stream_engine.send("local--py-streams", value=payload, key="1")
        print(f"Message sent: {metadata}")
        await asyncio.sleep(5)


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

(This script is complete, it should run "as is")

FastAPI

The following code example shows how kstreams can be integrated with any async framework like FastAPI. The full example can be found here

First, we need to create an engine:

Create the StreamEngine
# streaming.engine.py
from kstreams import create_engine

stream_engine = create_engine(
    title="my-stream-engine",
)

Define the streams:

Application stream
# streaming.streams.py
from .engine import stream_engine
from kstreams import ConsumerRecord


@stream_engine.stream("local--kstream")
async def stream(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")

Create the FastAPI:

FastAPI
# app.py
from fastapi import FastAPI
from starlette.responses import Response
from starlette_prometheus import PrometheusMiddleware, metrics

from .streaming.streams import stream_engine

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    await stream_engine.start()

@app.on_event("shutdown")
async def shutdown_event():
    await stream_engine.stop()


@app.get("/events")
async def post_produce_event() -> Response:
    payload = '{"message": "hello world!"}'

    metadata = await stream_engine.send(
        "local--kstream",
        value=payload.encode(),
    )
    msg = (
        f"Produced event on topic: {metadata.topic}, "
        f"part: {metadata.partition}, offset: {metadata.offset}"
    )

    return Response(msg)


app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True)
app.add_api_route("/metrics", metrics)

Changing Kafka settings

To modify the settings of a cluster, like the servers, refer to the backends docs