Monitoring
This page discusses how to monitor your application using the Kafka metrics that are accessible in Prometheus.
Before we begin, it's crucial to note that Kafka itself makes a number of useful metrics available, including the cluster, broker, and clients (producer and consumers).
This means that we can quickly add some graphs to our dashboards by utilizing the already-exposed metrics.
Kstreams
includes a collection of metrics. See Metrics Docs for more information.
kstreams.PrometheusMonitor
Metrics monitor to keep track of Producers and Consumers.
Attributes:
metrics_scrape_time float: Amount of seconds that the monitor
will wait until next scrape iteration
Source code in kstreams/prometheus/monitor.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 | class PrometheusMonitor:
"""
Metrics monitor to keep track of Producers and Consumers.
Attributes:
metrics_scrape_time float: Amount of seconds that the monitor
will wait until next scrape iteration
"""
# Producer metrics
MET_OFFSETS = Gauge(
"topic_partition_offsets", "help producer offsets", ["topic", "partition"]
)
# Consumer metrics
MET_COMMITTED = Gauge(
"consumer_committed",
"help consumer committed",
["topic", "partition", "consumer_group"],
)
MET_POSITION = Gauge(
"consumer_position",
"help consumer position",
["topic", "partition", "consumer_group"],
)
MET_HIGHWATER = Gauge(
"consumer_highwater",
"help consumer highwater",
["topic", "partition", "consumer_group"],
)
MET_LAG = Gauge(
"consumer_lag",
"help consumer lag calculated using the last commited offset",
["topic", "partition", "consumer_group"],
)
MET_POSITION_LAG = Gauge(
"position_lag",
"help consumer position lag calculated using the consumer position",
["topic", "partition", "consumer_group"],
)
def __init__(self, metrics_scrape_time: float = 3):
self.metrics_scrape_time = metrics_scrape_time
self.running = False
self._producer = None
self._streams: List[Stream] = []
async def start(self) -> None:
self.running = True
logger.info("Starting Prometheus Monitoring started...")
await self._metrics_task()
async def stop(self) -> None:
self.running = False
self._clean_consumer_metrics()
logger.info("Prometheus Monitoring stopped...")
def add_topic_partition_offset(
self, topic: str, partition: int, offset: int
) -> None:
self.MET_OFFSETS.labels(topic=topic, partition=partition).set(offset)
def _add_consumer_metrics(self, metrics_dict: MetricsType):
for topic_partition, partitions_metadata in metrics_dict.items():
group_id = partitions_metadata["group_id"]
position = partitions_metadata["position"]
committed = partitions_metadata["committed"]
highwater = partitions_metadata["highwater"]
lag = partitions_metadata["lag"]
position_lag = partitions_metadata["position_lag"]
self.MET_COMMITTED.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(committed or 0)
self.MET_POSITION.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(position or -1)
self.MET_HIGHWATER.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(highwater or 0)
self.MET_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(lag or 0)
self.MET_POSITION_LAG.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group=group_id,
).set(position_lag or 0)
def _clean_consumer_metrics(self) -> None:
"""
This method should be called when a rebalance takes place
to clean all consumers metrics. When the rebalance finishes
new metrics will be generated per consumer based on the
consumer assigments
"""
self.MET_LAG.clear()
self.MET_POSITION_LAG.clear()
self.MET_COMMITTED.clear()
self.MET_POSITION.clear()
self.MET_HIGHWATER.clear()
def clean_stream_consumer_metrics(self, consumer: Consumer) -> None:
topic_partitions = consumer.assignment()
group_id = consumer._group_id
for topic_partition in topic_partitions:
topic = topic_partition.topic
partition = topic_partition.partition
metrics_found = False
for sample in list(self.MET_LAG.collect())[0].samples:
if {
"topic": topic,
"partition": str(partition),
"consumer_group": group_id,
} == sample.labels:
metrics_found = True
if metrics_found:
self.MET_LAG.remove(topic, partition, group_id)
self.MET_POSITION_LAG.remove(topic, partition, group_id)
self.MET_COMMITTED.remove(topic, partition, group_id)
self.MET_POSITION.remove(topic, partition, group_id)
self.MET_HIGHWATER.remove(topic, partition, group_id)
else:
logger.debug(
"Metrics for consumer with group-id: "
f"{consumer._group_id} not found"
)
def add_producer(self, producer):
self._producer = producer
def add_streams(self, streams):
self._streams = streams
async def generate_consumer_metrics(self, consumer: Consumer):
"""
Generate Consumer Metrics for Prometheus
Format:
{
"topic-1": {
"1": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
},
...
"topic-n": {
"1": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
}
}
"""
metrics: MetricsType = DefaultDict(dict)
topic_partitions = consumer.assignment()
for topic_partition in topic_partitions:
committed = await consumer.committed(topic_partition) or 0
position = await consumer.position(topic_partition)
highwater = consumer.highwater(topic_partition)
lag = position_lag = None
if highwater:
lag = highwater - committed
position_lag = highwater - position
metrics[topic_partition] = {
"group_id": consumer._group_id,
"committed": committed,
"position": position,
"highwater": highwater,
"lag": lag,
"position_lag": position_lag,
}
self._add_consumer_metrics(metrics)
async def _metrics_task(self) -> None:
"""
Task that runs in `backgroud` to generate
consumer metrics.
When self.running is False the task will finish and it
will be safe to stop consumers and producers.
"""
while self.running:
await asyncio.sleep(self.metrics_scrape_time)
for stream in self._streams:
if stream.consumer is not None:
try:
await self.generate_consumer_metrics(stream.consumer)
except RuntimeError:
logger.debug(
f"Metrics for stream {stream.name} can not be generated "
"probably because it has been removed"
)
|
generate_consumer_metrics(consumer)
async
Generate Consumer Metrics for Prometheus
Format
{
"topic-1": {
"1": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
},
...
"topic-n": {
"1": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
}
}
Source code in kstreams/prometheus/monitor.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 | async def generate_consumer_metrics(self, consumer: Consumer):
"""
Generate Consumer Metrics for Prometheus
Format:
{
"topic-1": {
"1": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-1, partition-number, 'group-id-1'],
committed, position, highwater, lag, position_lag
)
},
...
"topic-n": {
"1": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
"2": (
[topic-n, partition-number, 'group-id-n'],
committed, position, highwater, lag, position_lag
)
}
}
"""
metrics: MetricsType = DefaultDict(dict)
topic_partitions = consumer.assignment()
for topic_partition in topic_partitions:
committed = await consumer.committed(topic_partition) or 0
position = await consumer.position(topic_partition)
highwater = consumer.highwater(topic_partition)
lag = position_lag = None
if highwater:
lag = highwater - committed
position_lag = highwater - position
metrics[topic_partition] = {
"group_id": consumer._group_id,
"committed": committed,
"position": position,
"highwater": highwater,
"lag": lag,
"position_lag": position_lag,
}
self._add_consumer_metrics(metrics)
|
Consumer Metrics
We advise including the consumer_lag
in your application's grafana dashboard.
consumer_lag
will show you how far your consumers are lagging behind the published events in the topic they are reading. For instance, if you have a single consumer and another team is producing millions of events, the consumer might not be able to handle them in time (where in time is defined by you, like: "in an hour of receiving a message it should be consumed").
Based on the lag, you will have to develop your own alerts. An alert should be pushed to Slack if you experience more than a particular amount of lag.
You will require your consumer_group
name in order to design a basic dashboard using the consumer_lag
.
We could add a query in Grafana like this:
sum(kafka_consumer_group_ConsumerLagMetrics_Value{topic =~ "YOUR_OWN_TOPIC_NAME", groupId =~"YOUR_CONSUMER_GROUP", name="SumOffsetLag"}) by (topic)
Remember to replace YOUR_CONSUMER_GROUP
and YOUR_OWN_TOPIC_NAME
with your consumer_group
and topic
respectively ⬆️
Producer Metrics
If you have producers, it's a good idea to monitor the growth of Log End Offset (LEO).
The increase in LEO indicates the number of events produced in the last N
minutes.
If you know that events should occur every N
minutes, you can trigger alerts if no events occur because this metric will tell you whether or not events occurred.
We could add a query in Grafana like this, where N
is 10m
:
sum(max(increase(kafka_log_Log_Value{name="LogEndOffset", topic =~ "TOPIC_NAME"}[10m])) by (partition, topic)) by (topic)
Remember to modify TOPIC_NAME
to the name of the topic you want to track ⬆️
Custom Business Metrics
One benefit of Prometheus is that you can design your own custom metrics.
Scenario: Consider an event-based ordering system. Assume you receive X orders daily and ship Y orders daily. Most likely, you will create a dashboard using this data.
Fortunately, we can create our own custom metrics by using the Prometheus Python client.
You can construct a variety of metrics with prometheus:
Gauge
Counter
Histogram
Summary
You can read more about it in prometheus metric_types website.
In our scenario, we will most likely want a Counter
for orders received and a Counter
for orders shipped.
from prometheus_client import Counter
from kstreams import PrometheusMonitor
class MyAppPrometheusMonitor(PrometheusMonitor):
def __init__(self):
super().__init__() # initialize kstream metrics
self.orders_received = Counter('orders_received', 'Amount of orders received')
self.orders_shipped = Counter('orders_shipped', 'Amount of orders shipped')
def increase_received(self, amount: int = 1):
self.orders_received.inc(amount)
def increase_shipped(self, amount: int = 1):
self.orders_shipped.inc(amount)
In our kstreams app, we can:
stream_engine = create_engine(title="my-engine", monitor=MyAppPrometheusMonitor())
@stream_engine.stream("my-special-orders")
async def consume_orders_received(cr: ConsumerRecord):
if cr.value.status == "NEW":
stream_engine.monitor.increase_received()
elif cr.value.status == "SHIPPED":
stream_engine.monitor.increase_shipped()
Your app's prometheus would display this data, which you might utilize to build a stylish ✨dashboard✨ interface.
For further details, see the Prometheus python client documentation.