Skip to content

Monitoring

This page discusses how to monitor your application using the Kafka metrics that are accessible in Prometheus.

Before we begin, it's crucial to note that Kafka itself makes a number of useful metrics available, including the cluster, broker, and clients (producer and consumers).

This means that we can quickly add some graphs to our dashboards by utilizing the already-exposed metrics.

Kstreams includes a collection of metrics. See Metrics Docs for more information.

kstreams.PrometheusMonitor

Metrics monitor to keep track of Producers and Consumers.

Attributes: metrics_scrape_time float: Amount of seconds that the monitor will wait until next scrape iteration

Source code in kstreams/prometheus/monitor.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class PrometheusMonitor:
    """
    Metrics monitor to keep track of Producers and Consumers.

     Attributes:
        metrics_scrape_time float: Amount of seconds that the monitor
            will wait until next scrape iteration
    """

    # Producer metrics
    MET_OFFSETS = Gauge(
        "topic_partition_offsets", "help producer offsets", ["topic", "partition"]
    )

    # Consumer metrics
    MET_COMMITTED = Gauge(
        "consumer_committed",
        "help consumer committed",
        ["topic", "partition", "consumer_group"],
    )
    MET_POSITION = Gauge(
        "consumer_position",
        "help consumer position",
        ["topic", "partition", "consumer_group"],
    )
    MET_HIGHWATER = Gauge(
        "consumer_highwater",
        "help consumer highwater",
        ["topic", "partition", "consumer_group"],
    )
    MET_LAG = Gauge(
        "consumer_lag",
        "help consumer lag calculated using the last commited offset",
        ["topic", "partition", "consumer_group"],
    )
    MET_POSITION_LAG = Gauge(
        "position_lag",
        "help consumer position lag calculated using the consumer position",
        ["topic", "partition", "consumer_group"],
    )

    def __init__(self, metrics_scrape_time: float = 3):
        self.metrics_scrape_time = metrics_scrape_time
        self.running = False
        self._producer = None
        self._streams: List[Stream] = []

    async def start(self) -> None:
        self.running = True
        logger.info("Starting Prometheus Monitoring started...")
        await self._metrics_task()

    async def stop(self) -> None:
        self.running = False
        self._clean_consumer_metrics()
        logger.info("Prometheus Monitoring stopped...")

    def add_topic_partition_offset(
        self, topic: str, partition: int, offset: int
    ) -> None:
        self.MET_OFFSETS.labels(topic=topic, partition=partition).set(offset)

    def _add_consumer_metrics(self, metrics_dict: MetricsType):
        for topic_partition, partitions_metadata in metrics_dict.items():
            group_id = partitions_metadata["group_id"]
            position = partitions_metadata["position"]
            committed = partitions_metadata["committed"]
            highwater = partitions_metadata["highwater"]
            lag = partitions_metadata["lag"]
            position_lag = partitions_metadata["position_lag"]

            self.MET_COMMITTED.labels(
                topic=topic_partition.topic,
                partition=topic_partition.partition,
                consumer_group=group_id,
            ).set(committed or 0)
            self.MET_POSITION.labels(
                topic=topic_partition.topic,
                partition=topic_partition.partition,
                consumer_group=group_id,
            ).set(position or -1)
            self.MET_HIGHWATER.labels(
                topic=topic_partition.topic,
                partition=topic_partition.partition,
                consumer_group=group_id,
            ).set(highwater or 0)
            self.MET_LAG.labels(
                topic=topic_partition.topic,
                partition=topic_partition.partition,
                consumer_group=group_id,
            ).set(lag or 0)
            self.MET_POSITION_LAG.labels(
                topic=topic_partition.topic,
                partition=topic_partition.partition,
                consumer_group=group_id,
            ).set(position_lag or 0)

    def _clean_consumer_metrics(self) -> None:
        """
        This method should be called when a rebalance takes place
        to clean all consumers metrics. When the rebalance finishes
        new metrics will be generated per consumer based on the
        consumer assigments
        """
        self.MET_LAG.clear()
        self.MET_POSITION_LAG.clear()
        self.MET_COMMITTED.clear()
        self.MET_POSITION.clear()
        self.MET_HIGHWATER.clear()

    def clean_stream_consumer_metrics(self, consumer: Consumer) -> None:
        topic_partitions = consumer.assignment()
        group_id = consumer._group_id
        for topic_partition in topic_partitions:
            topic = topic_partition.topic
            partition = topic_partition.partition

            metrics_found = False
            for sample in list(self.MET_LAG.collect())[0].samples:
                if {
                    "topic": topic,
                    "partition": str(partition),
                    "consumer_group": group_id,
                } == sample.labels:
                    metrics_found = True

            if metrics_found:
                self.MET_LAG.remove(topic, partition, group_id)
                self.MET_POSITION_LAG.remove(topic, partition, group_id)
                self.MET_COMMITTED.remove(topic, partition, group_id)
                self.MET_POSITION.remove(topic, partition, group_id)
                self.MET_HIGHWATER.remove(topic, partition, group_id)
            else:
                logger.debug(
                    "Metrics for consumer with group-id: "
                    f"{consumer._group_id} not found"
                )

    def add_producer(self, producer):
        self._producer = producer

    def add_streams(self, streams):
        self._streams = streams

    async def generate_consumer_metrics(self, consumer: Consumer):
        """
        Generate Consumer Metrics for Prometheus

        Format:
            {
                "topic-1": {
                    "1": (
                        [topic-1, partition-number, 'group-id-1'],
                        committed, position, highwater, lag, position_lag
                    )
                    "2": (
                        [topic-1, partition-number, 'group-id-1'],
                        committed, position, highwater, lag, position_lag
                    )
                },
                ...
                "topic-n": {
                    "1": (
                        [topic-n, partition-number, 'group-id-n'],
                        committed, position, highwater, lag, position_lag
                    )
                    "2": (
                        [topic-n, partition-number, 'group-id-n'],
                        committed, position, highwater, lag, position_lag
                    )
                }
            }
        """
        metrics: MetricsType = DefaultDict(dict)

        topic_partitions = consumer.assignment()

        for topic_partition in topic_partitions:
            committed = await consumer.committed(topic_partition) or 0
            position = await consumer.position(topic_partition)
            highwater = consumer.highwater(topic_partition)

            lag = position_lag = None
            if highwater:
                lag = highwater - committed
                position_lag = highwater - position

            metrics[topic_partition] = {
                "group_id": consumer._group_id,
                "committed": committed,
                "position": position,
                "highwater": highwater,
                "lag": lag,
                "position_lag": position_lag,
            }

        self._add_consumer_metrics(metrics)

    async def _metrics_task(self) -> None:
        """
        Task that runs in `backgroud` to generate
        consumer metrics.

        When self.running is False the task will finish and it
        will be safe to stop consumers and producers.
        """
        while self.running:
            await asyncio.sleep(self.metrics_scrape_time)
            for stream in self._streams:
                if stream.consumer is not None:
                    try:
                        await self.generate_consumer_metrics(stream.consumer)
                    except RuntimeError:
                        logger.debug(
                            f"Metrics for stream {stream.name} can not be generated "
                            "probably because it has been removed"
                        )

generate_consumer_metrics(consumer) async

Generate Consumer Metrics for Prometheus

Format

{ "topic-1": { "1": ( [topic-1, partition-number, 'group-id-1'], committed, position, highwater, lag, position_lag ) "2": ( [topic-1, partition-number, 'group-id-1'], committed, position, highwater, lag, position_lag ) }, ... "topic-n": { "1": ( [topic-n, partition-number, 'group-id-n'], committed, position, highwater, lag, position_lag ) "2": ( [topic-n, partition-number, 'group-id-n'], committed, position, highwater, lag, position_lag ) } }

Source code in kstreams/prometheus/monitor.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
async def generate_consumer_metrics(self, consumer: Consumer):
    """
    Generate Consumer Metrics for Prometheus

    Format:
        {
            "topic-1": {
                "1": (
                    [topic-1, partition-number, 'group-id-1'],
                    committed, position, highwater, lag, position_lag
                )
                "2": (
                    [topic-1, partition-number, 'group-id-1'],
                    committed, position, highwater, lag, position_lag
                )
            },
            ...
            "topic-n": {
                "1": (
                    [topic-n, partition-number, 'group-id-n'],
                    committed, position, highwater, lag, position_lag
                )
                "2": (
                    [topic-n, partition-number, 'group-id-n'],
                    committed, position, highwater, lag, position_lag
                )
            }
        }
    """
    metrics: MetricsType = DefaultDict(dict)

    topic_partitions = consumer.assignment()

    for topic_partition in topic_partitions:
        committed = await consumer.committed(topic_partition) or 0
        position = await consumer.position(topic_partition)
        highwater = consumer.highwater(topic_partition)

        lag = position_lag = None
        if highwater:
            lag = highwater - committed
            position_lag = highwater - position

        metrics[topic_partition] = {
            "group_id": consumer._group_id,
            "committed": committed,
            "position": position,
            "highwater": highwater,
            "lag": lag,
            "position_lag": position_lag,
        }

    self._add_consumer_metrics(metrics)

Consumer Metrics

We advise including the consumer_lag in your application's grafana dashboard.

consumer_lag will show you how far your consumers are lagging behind the published events in the topic they are reading. For instance, if you have a single consumer and another team is producing millions of events, the consumer might not be able to handle them in time (where in time is defined by you, like: "in an hour of receiving a message it should be consumed").

Based on the lag, you will have to develop your own alerts. An alert should be pushed to Slack if you experience more than a particular amount of lag.

You will require your consumer_group name in order to design a basic dashboard using the consumer_lag.

We could add a query in Grafana like this:

sum(kafka_consumer_group_ConsumerLagMetrics_Value{topic =~ "YOUR_OWN_TOPIC_NAME", groupId =~"YOUR_CONSUMER_GROUP", name="SumOffsetLag"}) by (topic)

Remember to replace YOUR_CONSUMER_GROUP and YOUR_OWN_TOPIC_NAME with your consumer_group and topic respectively ⬆️

Producer Metrics

If you have producers, it's a good idea to monitor the growth of Log End Offset (LEO).

The increase in LEO indicates the number of events produced in the last N minutes.

If you know that events should occur every N minutes, you can trigger alerts if no events occur because this metric will tell you whether or not events occurred.

We could add a query in Grafana like this, where N is 10m:

sum(max(increase(kafka_log_Log_Value{name="LogEndOffset", topic =~ "TOPIC_NAME"}[10m])) by (partition, topic)) by (topic)

Remember to modify TOPIC_NAME to the name of the topic you want to track ⬆️

Custom Business Metrics

One benefit of Prometheus is that you can design your own custom metrics.

Scenario: Consider an event-based ordering system. Assume you receive X orders daily and ship Y orders daily. Most likely, you will create a dashboard using this data.

Fortunately, we can create our own custom metrics by using the Prometheus Python client.

You can construct a variety of metrics with prometheus:

  • Gauge
  • Counter
  • Histogram
  • Summary

You can read more about it in prometheus metric_types website.

In our scenario, we will most likely want a Counter for orders received and a Counter for orders shipped.

from prometheus_client import Counter
from kstreams import PrometheusMonitor

class MyAppPrometheusMonitor(PrometheusMonitor):
    def __init__(self):
        super().__init__() # initialize kstream metrics
        self.orders_received = Counter('orders_received', 'Amount of orders received')
        self.orders_shipped = Counter('orders_shipped', 'Amount of orders shipped')

    def increase_received(self, amount: int = 1):
        self.orders_received.inc(amount)

    def increase_shipped(self, amount: int = 1):
        self.orders_shipped.inc(amount)

In our kstreams app, we can:

stream_engine = create_engine(title="my-engine", monitor=MyAppPrometheusMonitor())

@stream_engine.stream("my-special-orders")
async def consume_orders_received(cr: ConsumerRecord):
    if cr.value.status == "NEW":
        stream_engine.monitor.increase_received()
    elif cr.value.status == "SHIPPED":
        stream_engine.monitor.increase_shipped()

Your app's prometheus would display this data, which you might utilize to build a stylish ✨dashboard✨ interface.

For further details, see the Prometheus python client documentation.