Getting Started
You can starting using kstreams
with simple producers
and consumers
and/or integrated it with any async
framework like FastAPI
Simple consumer and producer
Simple use case
import asyncio
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--py-streams", value=payload, key="1")
print(f"Message sent: {metadata}")
await asyncio.sleep(5)
async def start():
await stream_engine.start()
await produce()
async def shutdown():
await stream_engine.stop()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start())
loop.run_forever()
finally:
loop.run_until_complete(shutdown())
loop.close()
(This script is complete, it should run "as is")
Recommended usage
In the previous example you can see some boiler plate regarding how to start the program. We recommend to use aiorun,
so you want have to worry about set signal handlers
, shutdown callbacks
, graceful shutdown
and close the event loop
.
Usage with aiorun
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--py-stream", group_id="de-my-partition")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--py-streams", value=payload, key="1")
print(f"Message sent: {metadata}")
await asyncio.sleep(5)
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
(This script is complete, it should run "as is")
FastAPI
The following code example shows how kstreams
can be integrated with any async
framework like FastAPI
. The full example can be found here
First, we need to create an engine
:
Create the StreamEngine
# streaming.engine.py
from kstreams import create_engine
stream_engine = create_engine(
title="my-stream-engine",
)
Define the streams
:
Application stream
# streaming.streams.py
from .engine import stream_engine
from kstreams import ConsumerRecord
@stream_engine.stream("local--kstream")
async def stream(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")
Create the FastAPI
:
FastAPI
# app.py
from fastapi import FastAPI
from starlette.responses import Response
from starlette_prometheus import PrometheusMiddleware, metrics
from .streaming.streams import stream_engine
app = FastAPI()
@app.on_event("startup")
async def startup_event():
await stream_engine.start()
@app.on_event("shutdown")
async def shutdown_event():
await stream_engine.stop()
@app.get("/events")
async def post_produce_event() -> Response:
payload = '{"message": "hello world!"}'
metadata = await stream_engine.send(
"local--kstream",
value=payload.encode(),
)
msg = (
f"Produced event on topic: {metadata.topic}, "
f"part: {metadata.partition}, offset: {metadata.offset}"
)
return Response(msg)
app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True)
app.add_api_route("/metrics", metrics)
Changing Kafka settings
To modify the settings of a cluster, like the servers, refer to the backends docs