Skip to content

Serialization

Kafka's job is to move bytes from producer to consumers, through a topic.

By default, this is what kstream does.

from kstreams import Stream

from .streams_roster import stream_roster

my_stream = Stream(
    "local--hello-world",
    func=stream_roster,
    config={
        "group_id": "example-group",
    },
)

As you can see the ConsumerRecord's value is bytes.

In order to keep your code pythonic, we provide a mechanism to serialize/deserialize these bytes, into something more useful. This way, you can work with other data structures, like a dict or dataclasses.

Sometimes it is easier to work with a dict in your app, give it to kstreams, and let it transform it into bytes to be delivered to Kafka. For this situation, you need to implement kstreams.serializers.Serializer.

kstreams.serializers.Serializer

Protocol used by the Stream to serialize.

A Protocol is similar to other languages features like an interface or a trait.

End users should provide their own class implementing this protocol.

For example a JsonSerializer

from typing import Optional, Dict
import json

class JsonSerializer:

    async def serialize(
        self,
        payload: dict,
        headers: Optional[Dict[str, str]] = None,
        serializer_kwargs: Optional[Dict] = None,
    ) -> bytes:
        """Return UTF-8 encoded payload"""
        value = json.dumps(payload)
        return value.encode()

Notice that you don't need to inherit anything, you just have to comply with the Protocol.

Source code in kstreams/serializers.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class Serializer(Protocol):
    """Protocol used by the Stream to serialize.

    A Protocol is similar to other languages features like an interface or a trait.

    End users should provide their own class implementing this protocol.

    For example a `JsonSerializer`

    ```python
    from typing import Optional, Dict
    import json

    class JsonSerializer:

        async def serialize(
            self,
            payload: dict,
            headers: Optional[Dict[str, str]] = None,
            serializer_kwargs: Optional[Dict] = None,
        ) -> bytes:
            \"""Return UTF-8 encoded payload\"""
            value = json.dumps(payload)
            return value.encode()
    ```

    Notice that you don't need to inherit anything,
    you just have to comply with the Protocol.
    """

    async def serialize(
        self,
        payload: Any,
        headers: Optional[Headers] = None,
        serializer_kwargs: Optional[Dict] = None,
    ) -> bytes:
        """
        Implement this method to deserialize the data received from the topic.
        """
        ...

serialize(payload, headers=None, serializer_kwargs=None) async

Implement this method to deserialize the data received from the topic.

Source code in kstreams/serializers.py
69
70
71
72
73
74
75
76
77
78
async def serialize(
    self,
    payload: Any,
    headers: Optional[Headers] = None,
    serializer_kwargs: Optional[Dict] = None,
) -> bytes:
    """
    Implement this method to deserialize the data received from the topic.
    """
    ...

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with bytes, you may want to receive in your function the dict ready to be used. For those cases, we need to use middleware. For example, we can implement a JsonMiddleware:

from kstreams import middleware, ConsumerRecord


class JsonDeserializerMiddleware(middleware.BaseMiddleware):
    async def __call__(self, cr: ConsumerRecord):
        if cr.value is not None:
            data = json.loads(cr.value.decode())
            cr.value = data
        return await self.next_call(cr)

It is also possble to use kstreams.serializers.Deserializer for deserialization, but this will be deprecated

kstreams.serializers.Deserializer

Protocol used by the Stream to deserialize.

A Protocol is similar to other languages features like an interface or a trait.

End users should provide their own class implementing this protocol.

For example a JsonDeserializer

import json
from kstreams import ConsumerRecord

class JsonDeserializer:

    async def deserialize(
        self, consumer_record: ConsumerRecord, **kwargs
    ) -> ConsumerRecord:
        data = json.loads(consumer_record.value.decode())
        consumer_record.value = data
        return consumer_record
Source code in kstreams/serializers.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Deserializer(Protocol):
    """Protocol used by the Stream to deserialize.

    A Protocol is similar to other languages features like an interface or a trait.

    End users should provide their own class implementing this protocol.

    For example a `JsonDeserializer`

    ```python
    import json
    from kstreams import ConsumerRecord

    class JsonDeserializer:

        async def deserialize(
            self, consumer_record: ConsumerRecord, **kwargs
        ) -> ConsumerRecord:
            data = json.loads(consumer_record.value.decode())
            consumer_record.value = data
            return consumer_record
    ```
    """

    async def deserialize(
        self, consumer_record: ConsumerRecord, **kwargs
    ) -> ConsumerRecord:
        """
        Implement this method to deserialize the data received from the topic.
        """
        ...

deserialize(consumer_record, **kwargs) async

Implement this method to deserialize the data received from the topic.

Source code in kstreams/serializers.py
30
31
32
33
34
35
36
async def deserialize(
    self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
    """
    Implement this method to deserialize the data received from the topic.
    """
    ...

Warning

kstreams.serializers.Deserializer will be deprecated, use middlewares instead

Usage

Once you have written your serializer or deserializer, there are 2 ways of using them, in a generic fashion or per stream.

Initialize the engine with your serializers

By doing this all the streams will use these serializers by default.

stream_engine = create_engine(
    title="my-stream-engine",
    serializer=JsonSerializer(),
)

Initilize streams with a deserializer and produce events with serializers

from kstreams import middleware, ConsumerRecord


@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
    # remember event.value is now a dict
    print(cr.value["message"])
    save_to_db(cr)
await stream_engine.send(
    topic,
    value={"message": "test"}
    headers={"content-type": consts.APPLICATION_JSON,}
    key="1",
)