Skip to content

Utils

Utility functions

kstreams.utils

create_ssl_context(*, cafile=None, capath=None, cadata=None, certfile=None, keyfile=None, password=None, crlfile=None)

Wrapper of aiokafka.helpers.create_ssl_context with typehints.

Parameters:

Name Type Description Default
cafile Optional[str]

Certificate Authority file path containing certificates used to sign broker certificates

None
capath Optional[str]

Same as cafile, but points to a directory containing several CA certificates

None
cadata Union[str, bytes, None]

Same as cafile, but instead contains already read data in either ASCII or bytes format

None
certfile Optional[str]

optional filename of file in PEM format containing the client certificate, as well as any CA certificates needed to establish the certificate's authenticity

None
keyfile Optional[str]

optional filename containing the client private key.

None
password Optional[str]

optional password to be used when loading the certificate chain

None
Source code in kstreams/utils.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def create_ssl_context(
    *,
    cafile: Optional[str] = None,
    capath: Optional[str] = None,
    cadata: Union[str, bytes, None] = None,
    certfile: Optional[str] = None,
    keyfile: Optional[str] = None,
    password: Optional[str] = None,
    crlfile: Any = None,
):
    """Wrapper of [aiokafka.helpers.create_ssl_context](
        https://aiokafka.readthedocs.io/en/stable/api.html#helpers
    )
    with typehints.

    Arguments:
        cafile: Certificate Authority file path containing certificates
            used to sign broker certificates
        capath: Same as `cafile`, but points to a directory containing
            several CA certificates
        cadata: Same as `cafile`, but instead contains already
            read data in either ASCII or bytes format
        certfile: optional filename of file in PEM format containing
            the client certificate, as well as any CA certificates needed to
            establish the certificate's authenticity
        keyfile: optional filename containing the client private key.
        password: optional password to be used when loading the
            certificate chain

    """
    return aiokafka_create_ssl_context(
        cafile=cafile,
        capath=capath,
        cadata=cadata,
        certfile=certfile,
        keyfile=keyfile,
        password=password,
        crlfile=crlfile,
    )

create_ssl_context_from_mem(*, certdata, keydata, password=None, cadata=None)

Create a SSL context from data on memory.

This makes it easy to read the certificates from environmental variables Usually the data is loaded from env variables.

Parameters:

Name Type Description Default
cadata Optional[str]

certificates used to sign broker certificates provided as unicode str

None
certdata str

the client certificate, as well as any CA certificates needed to establish the certificate's authenticity provided as unicode str

required
keydata str

the client private key provided as unicode str

required
password Optional[str]

optional password to be used when loading the certificate chain

None
Source code in kstreams/utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def create_ssl_context_from_mem(
    *,
    certdata: str,
    keydata: str,
    password: Optional[str] = None,
    cadata: Optional[str] = None,
) -> Optional[ssl.SSLContext]:
    """Create a SSL context from data on memory.

    This makes it easy to read the certificates from environmental variables
    Usually the data is loaded from env variables.

    Arguments:
        cadata: certificates used to sign broker certificates provided as unicode str
        certdata: the client certificate, as well as any CA certificates needed to
            establish the certificate's authenticity provided as unicode str
        keydata: the client private key provided as unicode str
        password: optional password to be used when loading the
            certificate chain
    """
    with contextlib.ExitStack() as stack:
        cert_file = stack.enter_context(NamedTemporaryFile(suffix=".crt"))
        key_file = stack.enter_context(NamedTemporaryFile(suffix=".key"))

        # expecting unicode data, writing it as bytes to files as utf-8
        cert_file.write(certdata.encode("utf-8"))
        cert_file.flush()

        key_file.write(keydata.encode("utf-8"))
        key_file.flush()

        ssl_context = ssl.create_default_context(cadata=cadata)
        ssl_context.load_cert_chain(
            cert_file.name, keyfile=key_file.name, password=password
        )
        return ssl_context
    return None