Skip to content

Serialization

Kafka's job is to move bytes from producer to consumers, through a topic.

By default, this is what kstream does.

from kstreams import Stream

from .streams_roster import stream_roster

my_stream = Stream(
    "local--hello-world",
    func=stream_roster,
    config={
        "group_id": "example-group",
    },
)

As you can see the ConsumerRecord's value is bytes.

In order to keep your code pythonic, we provide a mechanism to serialize/deserialize these bytes, into something more useful. This way, you can work with other data structures, like a dict or dataclasses.

Serializers

Sometimes it is easier to work with a dict in your app, give it to kstreams, and let it transform it into bytes to be delivered to Kafka. For this situation, you need to implement kstreams.serializers.Serializer.

kstreams.serializers.Serializer

Protocol used by the Stream to serialize.

A Protocol is similar to other languages features like an interface or a trait.

End users should provide their own class implementing this protocol.

For example a JsonSerializer

from typing import Optional, Dict
import json

class JsonSerializer:

    async def serialize(
        self,
        payload: dict,
        headers: Optional[Dict[str, str]] = None,
        serializer_kwargs: Optional[Dict] = None,
    ) -> bytes:
        """Return UTF-8 encoded payload"""
        value = json.dumps(payload)
        return value.encode()

Notice that you don't need to inherit anything, you just have to comply with the Protocol.

Source code in kstreams/serializers.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Serializer(Protocol):
    """Protocol used by the Stream to serialize.

    A Protocol is similar to other languages features like an interface or a trait.

    End users should provide their own class implementing this protocol.

    For example a `JsonSerializer`

    ```python
    from typing import Optional, Dict
    import json

    class JsonSerializer:

        async def serialize(
            self,
            payload: dict,
            headers: Optional[Dict[str, str]] = None,
            serializer_kwargs: Optional[Dict] = None,
        ) -> bytes:
            \"""Return UTF-8 encoded payload\"""
            value = json.dumps(payload)
            return value.encode()
    ```

    Notice that you don't need to inherit anything,
    you just have to comply with the Protocol.
    """

    async def serialize(
        self,
        payload: Any,
        headers: Optional[Headers] = None,
        serializer_kwargs: Optional[Dict] = None,
    ) -> bytes:
        """
        Implement this method to deserialize the data received from the topic.
        """
        ...  # pragma: no cover

Deserializers

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with bytes, you may want to receive in your function the dict ready to be used. For those cases, we need to use middlewares.

Deserializers Middleware

For example, we can implement a JsonMiddleware:

from kstreams import middleware, ConsumerRecord


class JsonDeserializerMiddleware(middleware.BaseMiddleware):
    async def __call__(self, cr: ConsumerRecord):
        if cr.value is not None:
            data = json.loads(cr.value.decode())
            cr.value = data
        return await self.next_call(cr)

Old Deserializers

The old fashion way is to use Deserializers, which has been deprecated (but still maintained) in favor of middleware

Deserializers must implement the Deserializer Protocol

Example

import json
from kstreams import ConsumerRecord

class JsonDeserializer:

    async def deserialize(
        self, consumer_record: ConsumerRecord, **kwargs
    ) -> ConsumerRecord:
        data = json.loads(consumer_record.value.decode())
        consumer_record.value = data
        return consumer_record
Source code in kstreams/serializers.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Deserializer(Protocol):
    """Deserializers must implement the Deserializer Protocol

    !!! Example
        ```python
        import json
        from kstreams import ConsumerRecord

        class JsonDeserializer:

            async def deserialize(
                self, consumer_record: ConsumerRecord, **kwargs
            ) -> ConsumerRecord:
                data = json.loads(consumer_record.value.decode())
                consumer_record.value = data
                return consumer_record
        ```
    """

    async def deserialize(
        self, consumer_record: ConsumerRecord, **kwargs
    ) -> ConsumerRecord:
        """
        Implement this method to deserialize the data received from the topic.
        """
        ...

Warning

kstreams.serializers.Deserializer will be deprecated, use middlewares instead

Usage

Once you have written your serializer and middleware/deserializer, there are two ways to use them:

  • Globally: When Serializer and/or Deserializer is set to the StreamEngine instance
  • Per case: When a Serializer is used with the send coroutine or a Middleware/Deserializer is set to a stream

Globally

The engine is initialized with serializers. By doing this all the streams will use these deserializers by default and every time that an event is produced then the default serializer is used.

Json events example
from kstreams import create_engine, middleware, ConsumerRecord

topic = "local--kstreams"

stream_engine = create_engine(
    title="my-stream-engine",
    serializer=JsonSerializer(),
    deserializer=JsonDeserializer(),  # old fashion way and it will be deprecated
)


@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
    # remember event.value is now a dict
    print(cr.value["message"])
    save_to_db(cr)
    assert cr.value == {"message": "test"}


await stream_engine.send(
    topic,
    value={"message": "test"}
    headers={"content-type": consts.APPLICATION_JSON,}
    key="1",
)

Per case

This is when streams are initialized with a deserializer (preferably a middleware) and we produce events with serializers in the send function.

  • If a global serializer is set but we call send(serializer=...), then the local serializer is used, not the global one.
  • If a global deserializer is set but a stream has a local one, then the local deserializer is used. In other words, the most specific deserializer will be used
from kstreams import create_engine, middleware, ConsumerRecord

topic = "local--kstreams"

# stream_engine created without a `serializer/deserializer`
stream_engine = create_engine(
    title="my-stream-engine",
)

# Here deserializer=JsonDeserializer() instead, but it will be deprecated
@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
    # remember event.value is now a dict
    print(cr.value["message"])
    save_to_db(cr)


# send with a serializer
await stream_engine.send(
    topic,
    value={"message": "test"}
    headers={"content-type": consts.APPLICATION_JSON,}
    serializer=JsonSerializer()  # in this case the Global Serializer is not used if there was one
    key="1",
)

Forcing raw data

There is a situation when a global serializer is being used but still we want to produce raw data, for example when producing to a DLQ. For this case, when must set the serialzer option to None:

DLQ example
from kstreams import create_engine, middleware, ConsumerRecord


topic = "local--kstreams"
dlq_topic = "dlq--kstreams"

stream_engine = create_engine(
    title="my-stream-engine",
    serializer=JsonSerializer(),  # Global serializer
)


@stream_engine.stream(topic)
async def hello_stream(cr: ConsumerRecord):
    try:
        # remember event.value is now a dict
        save_to_db(cr)
        assert cr.value == {"message": "test"}
    except DeserializationException:
        await stream_engine.send(
            dlq_topic,
            value=cr.value
            headers=cr.headers
            key=cr.key,
            serializer=None, # force raw data
        )


# this will produce Json
await stream_engine.send(
    topic,
    value={"message": "test"}
    headers={"content-type": consts.APPLICATION_JSON,}
    key="1",
)