Skip to content

Backends

The main idea of a backend is to supply the necessary configuration to create a connection with the backend.

kstreams currently has support for Kafka as a backend.

kstreams.backends.kafka.Kafka

The Kafka backend validates the given attributes.

It uses pydantic internally.

Attributes:

Name Type Description
bootstrap_servers List[str]

kafka list of hostname:port

security_protocol SecurityProtocol

Protocol used to communicate with brokers

ssl_context Optional[SSLContext]

a python std ssl.SSLContext instance, you can generate it with create_ssl_context or create_ssl_context_from_mem

sasl_mechanism SaslMechanism

Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL

sasl_plain_username Optional[str]

username for sasl PLAIN authentication

sasl_plain_password Optional[str]

password for sasl PLAIN authentication

sasl_oauth_token_provider Optional[str]

smth

Raises:

Type Description
ValidationError

a pydantic.ValidationError exception

PLAINTEXT

Example

from kstreams.backends.kafka import Kafka
from kstreams import create_engine, Stream

backend = Kafka(bootstrap_servers=["localhost:9092"])
stream_engine = create_engine(title="my-stream-engine", backend=backend)

SSL

Example

Create SSL context
import ssl

from kstreams.backends.kafka import Kafka
from kstreams import create_engine, utils, Stream


def get_ssl_context() -> ssl.SSLContext:
    return utils.create_ssl_context(
        cafile="certificate-authority-file-path",
        capath="points-to-directory-with-several-ca-certificates",
        cadata="same-as-cafile-but-ASCII-or-bytes-format",
        certfile="client-certificate-file-name",
        keyfile="client-private-key-file-name",
        password="password-to-load-certificate-chain",
    )

backend = Kafka(
    bootstrap_servers=["localhost:9094"],
    security_protocol="SSL",
    ssl_context=get_ssl_context(),
)

stream_engine = create_engine(title="my-stream-engine", backend=backend)

Example

Create SSL context from memory
import ssl

from kstreams.backends.kafka import Kafka
from kstreams import create_engine, utils, Stream


def get_ssl_context() -> ssl.SSLContext:
    return utils.create_ssl_context_from_mem(
        cadata="ca-certificates-as-unicode",
        certdata="client-certificate-as-unicode",
        keydata="client-private-key-as-unicode",
        password="optional-password-to-load-certificate-chain",
    )

backend = Kafka(
    bootstrap_servers=["localhost:9094"],
    security_protocol="SSL",
    ssl_context=get_ssl_context(),
)

stream_engine = create_engine(title="my-stream-engine", backend=backend)
Source code in kstreams/backends/kafka.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class Kafka(BaseModel):
    """
    The `Kafka` backend validates the given attributes.

    It uses pydantic internally.

    Attributes:
        bootstrap_servers: kafka list of `hostname:port`
        security_protocol: Protocol used to communicate with brokers
        ssl_context: a python std `ssl.SSLContext` instance, you can generate
            it with `create_ssl_context`
            or `create_ssl_context_from_mem`
        sasl_mechanism: Authentication mechanism when `security_protocol` is configured
            for `SASL_PLAINTEXT` or `SASL_SSL`
        sasl_plain_username: username for sasl PLAIN authentication
        sasl_plain_password: password for sasl PLAIN authentication
        sasl_oauth_token_provider: smth

    Raises:
        ValidationError: a `pydantic.ValidationError` exception

    ## PLAINTEXT

    !!! Example
        ```python
        from kstreams.backends.kafka import Kafka
        from kstreams import create_engine, Stream

        backend = Kafka(bootstrap_servers=["localhost:9092"])
        stream_engine = create_engine(title="my-stream-engine", backend=backend)
        ```

    ## SSL

    !!! Example
        ```python title="Create SSL context"
        import ssl

        from kstreams.backends.kafka import Kafka
        from kstreams import create_engine, utils, Stream


        def get_ssl_context() -> ssl.SSLContext:
            return utils.create_ssl_context(
                cafile="certificate-authority-file-path",
                capath="points-to-directory-with-several-ca-certificates",
                cadata="same-as-cafile-but-ASCII-or-bytes-format",
                certfile="client-certificate-file-name",
                keyfile="client-private-key-file-name",
                password="password-to-load-certificate-chain",
            )

        backend = Kafka(
            bootstrap_servers=["localhost:9094"],
            security_protocol="SSL",
            ssl_context=get_ssl_context(),
        )

        stream_engine = create_engine(title="my-stream-engine", backend=backend)
        ```

        !!! note
            Check [create ssl context util](https://kpn.github.io/kstreams/utils/#kstreams.utils.create_ssl_context)

    !!! Example
        ```python title="Create SSL context from memory"
        import ssl

        from kstreams.backends.kafka import Kafka
        from kstreams import create_engine, utils, Stream


        def get_ssl_context() -> ssl.SSLContext:
            return utils.create_ssl_context_from_mem(
                cadata="ca-certificates-as-unicode",
                certdata="client-certificate-as-unicode",
                keydata="client-private-key-as-unicode",
                password="optional-password-to-load-certificate-chain",
            )

        backend = Kafka(
            bootstrap_servers=["localhost:9094"],
            security_protocol="SSL",
            ssl_context=get_ssl_context(),
        )

        stream_engine = create_engine(title="my-stream-engine", backend=backend)
        ```

        !!! note
            Check [create ssl context from memerory util](https://kpn.github.io/kstreams/utils/#kstreams.utils.create_ssl_context_from_mem)
    """

    bootstrap_servers: List[str] = ["localhost:9092"]
    security_protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT

    ssl_context: Optional[ssl.SSLContext] = None

    sasl_mechanism: SaslMechanism = SaslMechanism.PLAIN
    sasl_plain_username: Optional[str] = None
    sasl_plain_password: Optional[str] = None
    sasl_oauth_token_provider: Optional[str] = None
    model_config = ConfigDict(arbitrary_types_allowed=True, use_enum_values=True)

    @model_validator(mode="after")
    @classmethod
    def protocols_validation(cls, values):
        security_protocol = values.security_protocol

        if security_protocol == SecurityProtocol.PLAINTEXT:
            return values
        elif security_protocol == SecurityProtocol.SSL:
            if values.ssl_context is None:
                raise ValueError("`ssl_context` is required")
            return values
        elif security_protocol == SecurityProtocol.SASL_PLAINTEXT:
            if values.sasl_mechanism is SaslMechanism.OAUTHBEARER:
                # We don't perform a username and password check if OAUTHBEARER
                return values
            if (
                values.sasl_mechanism is SaslMechanism.PLAIN
                and values.sasl_plain_username is None
            ):
                raise ValueError(
                    "`sasl_plain_username` is required when using SASL_PLAIN"
                )
            if (
                values.sasl_mechanism is SaslMechanism.PLAIN
                and values.sasl_plain_password is None
            ):
                raise ValueError(
                    "`sasl_plain_password` is required when using SASL_PLAIN"
                )
            return values
        elif security_protocol == SecurityProtocol.SASL_SSL:
            if values.ssl_context is None:
                raise ValueError("`ssl_context` is required")
            if (
                values.sasl_mechanism is SaslMechanism.PLAIN
                and values.sasl_plain_username is None
            ):
                raise ValueError(
                    "`sasl_plain_username` is required when using SASL_PLAIN"
                )
            if (
                values.sasl_mechanism is SaslMechanism.PLAIN
                and values.sasl_plain_password is None
            ):
                raise ValueError(
                    "`sasl_plain_password` is required when using SASL_PLAIN"
                )
            return values