Serialization
Kafka's job is to move bytes from producer to consumers, through a topic.
By default, this is what kstream does.
from kstreams import Stream
from .streams_roster import stream_roster
my_stream = Stream(
"local--hello-world",
func=stream_roster,
config={
"group_id": "example-group",
},
)
As you can see the ConsumerRecord's value
is bytes.
In order to keep your code pythonic, we provide a mechanism to serialize/deserialize
these bytes
, into something more useful.
This way, you can work with other data structures, like a dict
or dataclasses
.
Serializers
Sometimes it is easier to work with a dict
in your app, give it to kstreams
, and let it transform it into bytes
to be delivered to Kafka. For this situation, you need to implement kstreams.serializers.Serializer
.
kstreams.serializers.Serializer
Protocol used by the Stream to serialize.
A Protocol is similar to other languages features like an interface or a trait.
End users should provide their own class implementing this protocol.
For example a JsonSerializer
from typing import Optional, Dict
import json
class JsonSerializer:
async def serialize(
self,
payload: dict,
headers: Optional[Dict[str, str]] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""Return UTF-8 encoded payload"""
value = json.dumps(payload)
return value.encode()
Notice that you don't need to inherit anything, you just have to comply with the Protocol.
Source code in kstreams/serializers.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
|
Deserializers
The other situation is when you consume from Kafka (or other brokers). Instead of dealing with bytes
, you may want to receive in your function the dict
ready to be used.
For those cases, we need to use middlewares.
Deserializers Middleware
For example, we can implement a JsonMiddleware
:
from kstreams import middleware, ConsumerRecord
class JsonDeserializerMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
if cr.value is not None:
data = json.loads(cr.value.decode())
cr.value = data
return await self.next_call(cr)
Old Deserializers
The old fashion way is to use Deserializers
, which has been deprecated (but still maintained) in favor of middleware
Deserializers must implement the Deserializer Protocol
Example
import json
from kstreams import ConsumerRecord
class JsonDeserializer:
async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
data = json.loads(consumer_record.value.decode())
consumer_record.value = data
return consumer_record
Source code in kstreams/serializers.py
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
|
Warning
kstreams.serializers.Deserializer
will be deprecated, use middlewares instead
Usage
Once you have written your serializer
and middleware/deserializer
, there are two ways to use them:
Globally
: WhenSerializer
and/orDeserializer
is set to theStreamEngine
instancePer case
: When aSerializer
is used with thesend coroutine
or aMiddleware/Deserializer
is set to astream
Globally
The engine is initialized with serializers. By doing this all the streams will use these deserializers by default and every time that an event is produced
then the default serializer
is used.
from kstreams import create_engine, middleware, ConsumerRecord
topic = "local--kstreams"
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(),
deserializer=JsonDeserializer(), # old fashion way and it will be deprecated
)
@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
assert cr.value == {"message": "test"}
await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
)
Per case
This is when streams
are initialized with a deserializer
(preferably a middleware
) and we produce events with serializers
in the send function.
- If a
global serializer
is set but we callsend(serializer=...)
, then the localserializer
is used, not the global one. - If a
global deserializer
is set but astream
has a local one, then the localdeserializer
is used. In other words, the most specificdeserializer
will be used
from kstreams import create_engine, middleware, ConsumerRecord
topic = "local--kstreams"
# stream_engine created without a `serializer/deserializer`
stream_engine = create_engine(
title="my-stream-engine",
)
# Here deserializer=JsonDeserializer() instead, but it will be deprecated
@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
save_to_db(cr)
# send with a serializer
await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
serializer=JsonSerializer() # in this case the Global Serializer is not used if there was one
key="1",
)
Forcing raw data
There is a situation when a global
serializer is being used but still we want to produce raw data, for example when producing to a DLQ
.
For this case, when must set the serialzer
option to None
:
from kstreams import create_engine, middleware, ConsumerRecord
topic = "local--kstreams"
dlq_topic = "dlq--kstreams"
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(), # Global serializer
)
@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
try:
# remember event.value is now a dict
save_to_db(cr)
assert cr.value == {"message": "test"}
except DeserializationException:
await stream_engine.send(
dlq_topic,
value=cr.value
headers=cr.headers
key=cr.key,
serializer=None, # force raw data
)
# this will produce Json
await stream_engine.send(
topic,
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
)