Serialization
Kafka's job is to move bytes from producer to consumers, through a topic.
By default, this is what kstream does.
from kstreams import Stream
from .streams_roster import stream_roster
my_stream = Stream(
"local--hello-world",
func=stream_roster,
config={
"group_id": "example-group",
},
)
As you can see the ConsumerRecord's value
is bytes.
In order to keep your code pythonic, we provide a mechanism to serialize/deserialize
these bytes, into something more useful.
This way, you can work with other data structures, like a dict
or dataclasses
.
Sometimes it is easier to work with a dict
in your app, give it to kstreams
, and let it transform it into bytes
to be delivered to Kafka. For this situation, you need to implement kstreams.serializers.Serializer
.
kstreams.serializers.Serializer
Protocol used by the Stream to serialize.
A Protocol is similar to other languages features like an interface or a trait.
End users should provide their own class implementing this protocol.
For example a JsonSerializer
from typing import Optional, Dict
import json
class JsonSerializer:
async def serialize(
self,
payload: dict,
headers: Optional[Dict[str, str]] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""Return UTF-8 encoded payload"""
value = json.dumps(payload)
return value.encode()
Notice that you don't need to inherit anything, you just have to comply with the Protocol.
Source code in kstreams/serializers.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
|
serialize(payload, headers=None, serializer_kwargs=None)
async
Implement this method to deserialize the data received from the topic.
Source code in kstreams/serializers.py
69 70 71 72 73 74 75 76 77 78 |
|
The other situation is when you consume from Kafka (or other brokers). Instead of dealing with bytes
,
you may want to receive in your function the dict
ready to be used. For those cases, we need to use middleware. For example, we can implement a JsonMiddleware
:
from kstreams import middleware, ConsumerRecord
class JsonDeserializerMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
if cr.value is not None:
data = json.loads(cr.value.decode())
cr.value = data
return await self.next_call(cr)
It is also possble to use kstreams.serializers.Deserializer
for deserialization, but this will be deprecated
kstreams.serializers.Deserializer
Protocol used by the Stream to deserialize.
A Protocol is similar to other languages features like an interface or a trait.
End users should provide their own class implementing this protocol.
For example a JsonDeserializer
import json
from kstreams import ConsumerRecord
class JsonDeserializer:
async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record
Source code in kstreams/serializers.py
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
deserialize(consumer_record, **kwargs)
async
Implement this method to deserialize the data received from the topic.
Source code in kstreams/serializers.py
30 31 32 33 34 35 36 |
|
Warning
kstreams.serializers.Deserializer
will be deprecated, use middlewares instead
Usage
Once you have written your serializer or deserializer, there are 2 ways of using them, in a generic fashion or per stream.
Initialize the engine with your serializers
By doing this all the streams will use these serializers by default.
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(),
)
Initilize streams
with a deserializer
and produce events with serializers
from kstreams import middleware, ConsumerRecord
@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
)