/src/fluent-bit/lib/librdkafka-2.10.1/src/rdkafka_telemetry_encode.c
Line | Count | Source |
1 | | /* |
2 | | * librdkafka - Apache Kafka C library |
3 | | * |
4 | | * Copyright (c) 2023, Confluent Inc. |
5 | | * All rights reserved. |
6 | | * |
7 | | * Redistribution and use in source and binary forms, with or without |
8 | | * modification, are permitted provided that the following conditions are met: |
9 | | * |
10 | | * 1. Redistributions of source code must retain the above copyright notice, |
11 | | * this list of conditions and the following disclaimer. |
12 | | * 2. Redistributions in binary form must reproduce the above copyright notice, |
13 | | * this list of conditions and the following disclaimer in the documentation |
14 | | * and/or other materials provided with the distribution. |
15 | | * |
16 | | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
17 | | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
18 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
19 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
20 | | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
21 | | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
22 | | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
23 | | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
24 | | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
25 | | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
26 | | * POSSIBILITY OF SUCH DAMAGE. |
27 | | */ |
28 | | |
29 | | #include "rdkafka_telemetry_encode.h" |
30 | | #include "nanopb/pb_encode.h" |
31 | | #include "opentelemetry/metrics.pb.h" |
32 | | |
33 | | #define THREE_ORDERS_MAGNITUDE 1000 |
34 | | |
35 | | typedef struct { |
36 | | opentelemetry_proto_metrics_v1_Metric **metrics; |
37 | | size_t count; |
38 | | } rd_kafka_telemetry_metrics_repeated_t; |
39 | | |
40 | | typedef struct { |
41 | | opentelemetry_proto_common_v1_KeyValue **key_values; |
42 | | size_t count; |
43 | | } rd_kafka_telemetry_key_values_repeated_t; |
44 | | |
45 | | #define calculate_avg(_avg_, _scale_factor_) \ |
46 | 0 | ((_avg_).ra_v.avg / (double)_scale_factor_) |
47 | | |
48 | | #define calculate_max(_avg_, _scale_factor_) \ |
49 | 0 | RD_CEIL_INTEGER_DIVISION((_avg_).ra_v.maxv, _scale_factor_) |
50 | | |
51 | | #define brokers_avg(_rk_, _avg_name_, _scale_factor_, _metric_) \ |
52 | 0 | do { \ |
53 | 0 | rd_kafka_broker_t *_rkb_; \ |
54 | 0 | double avg = 0; \ |
55 | 0 | int count = 0; \ |
56 | 0 | TAILQ_FOREACH(_rkb_, &(_rk_)->rk_brokers, rkb_link) { \ |
57 | 0 | rd_avg_t *rd_avg_rollover = \ |
58 | 0 | &_rkb_->rkb_telemetry.rd_avg_rollover._avg_name_; \ |
59 | 0 | if (rd_avg_rollover->ra_v.cnt) { \ |
60 | 0 | avg = (avg * count + \ |
61 | 0 | rd_avg_rollover->ra_v.sum) / \ |
62 | 0 | (double)(count + \ |
63 | 0 | rd_avg_rollover->ra_v.cnt); \ |
64 | 0 | count += rd_avg_rollover->ra_v.cnt; \ |
65 | 0 | } \ |
66 | 0 | } \ |
67 | 0 | if (_scale_factor_ > 1) \ |
68 | 0 | (_metric_).double_value = avg / _scale_factor_; \ |
69 | 0 | else \ |
70 | 0 | (_metric_).double_value = avg; \ |
71 | 0 | } while (0) |
72 | | |
73 | | #define brokers_max(_rk_, _avg_name_, _scale_factor_, _metric_) \ |
74 | 0 | do { \ |
75 | 0 | rd_kafka_broker_t *_rkb_; \ |
76 | 0 | _metric_.int_value = 0; \ |
77 | 0 | TAILQ_FOREACH(_rkb_, &(_rk_)->rk_brokers, rkb_link) { \ |
78 | 0 | _metric_.int_value = \ |
79 | 0 | RD_MAX(_metric_.int_value, \ |
80 | 0 | _rkb_->rkb_telemetry.rd_avg_rollover \ |
81 | 0 | ._avg_name_.ra_v.maxv); \ |
82 | 0 | } \ |
83 | 0 | if (_scale_factor_ > 1) \ |
84 | 0 | (_metric_).int_value = RD_CEIL_INTEGER_DIVISION( \ |
85 | 0 | (_metric_).int_value, _scale_factor_); \ |
86 | 0 | } while (0) |
87 | | |
88 | | static rd_kafka_telemetry_metric_value_t |
89 | | calculate_connection_creation_total(rd_kafka_t *rk, |
90 | | rd_kafka_broker_t *rkb_selected, |
91 | 0 | rd_ts_t now_ns) { |
92 | 0 | rd_kafka_telemetry_metric_value_t total; |
93 | 0 | rd_kafka_broker_t *rkb; |
94 | |
|
95 | 0 | total.int_value = 0; |
96 | 0 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
97 | 0 | const int32_t connects = rd_atomic32_get(&rkb->rkb_c.connects); |
98 | 0 | if (!rk->rk_telemetry.delta_temporality) |
99 | 0 | total.int_value += connects; |
100 | 0 | else |
101 | 0 | total.int_value += |
102 | 0 | connects - |
103 | 0 | rkb->rkb_telemetry.rkb_historic_c.connects; |
104 | 0 | } |
105 | |
|
106 | 0 | return total; |
107 | 0 | } |
108 | | |
109 | | static rd_kafka_telemetry_metric_value_t |
110 | | calculate_connection_creation_rate(rd_kafka_t *rk, |
111 | | rd_kafka_broker_t *rkb_selected, |
112 | 0 | rd_ts_t now_ns) { |
113 | 0 | rd_kafka_telemetry_metric_value_t total; |
114 | 0 | rd_kafka_broker_t *rkb; |
115 | 0 | rd_ts_t ts_last = rk->rk_telemetry.rk_historic_c.ts_last; |
116 | |
|
117 | 0 | total.double_value = 0; |
118 | 0 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
119 | 0 | total.double_value += |
120 | 0 | rd_atomic32_get(&rkb->rkb_c.connects) - |
121 | 0 | rkb->rkb_telemetry.rkb_historic_c.connects; |
122 | 0 | } |
123 | 0 | double seconds = (now_ns - ts_last) / 1e9; |
124 | 0 | if (seconds > 1.0) |
125 | 0 | total.double_value /= seconds; |
126 | 0 | return total; |
127 | 0 | } |
128 | | |
129 | | static rd_kafka_telemetry_metric_value_t |
130 | | calculate_broker_avg_rtt(rd_kafka_t *rk, |
131 | | rd_kafka_broker_t *rkb_selected, |
132 | 0 | rd_ts_t now_ns) { |
133 | 0 | rd_kafka_telemetry_metric_value_t avg_rtt = RD_ZERO_INIT; |
134 | 0 | avg_rtt.double_value = calculate_avg( |
135 | 0 | rkb_selected->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt, |
136 | 0 | THREE_ORDERS_MAGNITUDE); |
137 | 0 | return avg_rtt; |
138 | 0 | } |
139 | | |
140 | | static rd_kafka_telemetry_metric_value_t |
141 | | calculate_broker_max_rtt(rd_kafka_t *rk, |
142 | | rd_kafka_broker_t *rkb_selected, |
143 | 0 | rd_ts_t now_ns) { |
144 | 0 | rd_kafka_telemetry_metric_value_t max_rtt = RD_ZERO_INIT; |
145 | 0 | max_rtt.int_value = calculate_max( |
146 | 0 | rkb_selected->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt, |
147 | 0 | THREE_ORDERS_MAGNITUDE); |
148 | 0 | return max_rtt; |
149 | 0 | } |
150 | | |
151 | | static rd_kafka_telemetry_metric_value_t |
152 | | calculate_produce_latency_avg(rd_kafka_t *rk, |
153 | | rd_kafka_broker_t *rkb_selected, |
154 | 0 | rd_ts_t now_ns) { |
155 | 0 | rd_kafka_telemetry_metric_value_t avg_rtt = RD_ZERO_INIT; |
156 | 0 | brokers_avg(rk, rkb_avg_produce_latency, THREE_ORDERS_MAGNITUDE, |
157 | 0 | avg_rtt); |
158 | 0 | return avg_rtt; |
159 | 0 | } |
160 | | |
161 | | static rd_kafka_telemetry_metric_value_t |
162 | | calculate_produce_latency_max(rd_kafka_t *rk, |
163 | | rd_kafka_broker_t *rkb_selected, |
164 | 0 | rd_ts_t now_ns) { |
165 | 0 | rd_kafka_telemetry_metric_value_t max_rtt = RD_ZERO_INIT; |
166 | 0 | brokers_max(rk, rkb_avg_produce_latency, THREE_ORDERS_MAGNITUDE, |
167 | 0 | max_rtt); |
168 | 0 | return max_rtt; |
169 | 0 | } |
170 | | |
171 | | static rd_kafka_telemetry_metric_value_t |
172 | | calculate_throttle_avg(rd_kafka_t *rk, |
173 | | rd_kafka_broker_t *rkb_selected, |
174 | 0 | rd_ts_t now_ns) { |
175 | 0 | rd_kafka_telemetry_metric_value_t avg_throttle; |
176 | 0 | brokers_avg(rk, rkb_avg_throttle, 1, avg_throttle); |
177 | 0 | return avg_throttle; |
178 | 0 | } |
179 | | |
180 | | |
181 | | static rd_kafka_telemetry_metric_value_t |
182 | | calculate_throttle_max(rd_kafka_t *rk, |
183 | | rd_kafka_broker_t *rkb_selected, |
184 | 0 | rd_ts_t now_ns) { |
185 | 0 | rd_kafka_telemetry_metric_value_t max_throttle; |
186 | 0 | brokers_max(rk, rkb_avg_throttle, 1, max_throttle); |
187 | 0 | return max_throttle; |
188 | 0 | } |
189 | | |
190 | | static rd_kafka_telemetry_metric_value_t |
191 | | calculate_queue_time_avg(rd_kafka_t *rk, |
192 | | rd_kafka_broker_t *rkb_selected, |
193 | 0 | rd_ts_t now_ns) { |
194 | 0 | rd_kafka_telemetry_metric_value_t avg_queue_time; |
195 | 0 | brokers_avg(rk, rkb_avg_outbuf_latency, THREE_ORDERS_MAGNITUDE, |
196 | 0 | avg_queue_time); |
197 | 0 | return avg_queue_time; |
198 | 0 | } |
199 | | |
200 | | static rd_kafka_telemetry_metric_value_t |
201 | | calculate_queue_time_max(rd_kafka_t *rk, |
202 | | rd_kafka_broker_t *rkb_selected, |
203 | 0 | rd_ts_t now_ns) { |
204 | 0 | rd_kafka_telemetry_metric_value_t max_queue_time; |
205 | 0 | brokers_max(rk, rkb_avg_outbuf_latency, THREE_ORDERS_MAGNITUDE, |
206 | 0 | max_queue_time); |
207 | 0 | return max_queue_time; |
208 | 0 | } |
209 | | |
210 | | static rd_kafka_telemetry_metric_value_t |
211 | | calculate_consumer_assigned_partitions(rd_kafka_t *rk, |
212 | | rd_kafka_broker_t *rkb_selected, |
213 | 0 | rd_ts_t now_ns) { |
214 | 0 | rd_kafka_telemetry_metric_value_t assigned_partitions; |
215 | |
|
216 | 0 | assigned_partitions.int_value = |
217 | 0 | rk->rk_cgrp ? rk->rk_cgrp->rkcg_c.assignment_size : 0; |
218 | 0 | return assigned_partitions; |
219 | 0 | } |
220 | | |
221 | | static rd_kafka_telemetry_metric_value_t |
222 | | calculate_consumer_rebalance_latency_avg(rd_kafka_t *rk, |
223 | | rd_kafka_broker_t *rkb_selected, |
224 | 0 | rd_ts_t now_ns) { |
225 | 0 | rd_kafka_telemetry_metric_value_t avg_rebalance_time; |
226 | 0 | avg_rebalance_time.double_value = calculate_avg( |
227 | 0 | rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency, |
228 | 0 | THREE_ORDERS_MAGNITUDE); |
229 | 0 | return avg_rebalance_time; |
230 | 0 | } |
231 | | |
232 | | static rd_kafka_telemetry_metric_value_t |
233 | | calculate_consumer_rebalance_latency_max(rd_kafka_t *rk, |
234 | | rd_kafka_broker_t *rkb_selected, |
235 | 0 | rd_ts_t now_ns) { |
236 | 0 | rd_kafka_telemetry_metric_value_t max_rebalance_time; |
237 | 0 | max_rebalance_time.int_value = calculate_max( |
238 | 0 | rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency, |
239 | 0 | THREE_ORDERS_MAGNITUDE); |
240 | 0 | return max_rebalance_time; |
241 | 0 | } |
242 | | |
243 | | static rd_kafka_telemetry_metric_value_t |
244 | | calculate_consumer_rebalance_latency_total(rd_kafka_t *rk, |
245 | | rd_kafka_broker_t *rkb_selected, |
246 | 0 | rd_ts_t now_ns) { |
247 | 0 | rd_kafka_telemetry_metric_value_t total_rebalance_time; |
248 | 0 | total_rebalance_time.int_value = RD_CEIL_INTEGER_DIVISION( |
249 | 0 | rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency.ra_v.sum, |
250 | 0 | THREE_ORDERS_MAGNITUDE); |
251 | 0 | if (!rk->rk_telemetry.delta_temporality) { |
252 | 0 | total_rebalance_time.int_value += |
253 | 0 | rk->rk_telemetry.rk_historic_c.rebalance_latency_total; |
254 | 0 | } |
255 | 0 | return total_rebalance_time; |
256 | 0 | } |
257 | | |
258 | | static rd_kafka_telemetry_metric_value_t |
259 | | calculate_consumer_fetch_latency_avg(rd_kafka_t *rk, |
260 | | rd_kafka_broker_t *rkb_selected, |
261 | 0 | rd_ts_t now_ns) { |
262 | 0 | rd_kafka_telemetry_metric_value_t avg_fetch_time; |
263 | 0 | brokers_avg(rk, rkb_avg_fetch_latency, THREE_ORDERS_MAGNITUDE, |
264 | 0 | avg_fetch_time); |
265 | 0 | return avg_fetch_time; |
266 | 0 | } |
267 | | |
268 | | static rd_kafka_telemetry_metric_value_t |
269 | | calculate_consumer_fetch_latency_max(rd_kafka_t *rk, |
270 | | rd_kafka_broker_t *rkb_selected, |
271 | 0 | rd_ts_t now_ns) { |
272 | 0 | rd_kafka_telemetry_metric_value_t max_fetch_time; |
273 | 0 | brokers_max(rk, rkb_avg_fetch_latency, THREE_ORDERS_MAGNITUDE, |
274 | 0 | max_fetch_time); |
275 | 0 | return max_fetch_time; |
276 | 0 | } |
277 | | |
278 | | static rd_kafka_telemetry_metric_value_t |
279 | | calculate_consumer_poll_idle_ratio_avg(rd_kafka_t *rk, |
280 | | rd_kafka_broker_t *rkb_selected, |
281 | 0 | rd_ts_t now_ns) { |
282 | 0 | rd_kafka_telemetry_metric_value_t avg_poll_idle_avg; |
283 | 0 | avg_poll_idle_avg.double_value = calculate_avg( |
284 | 0 | rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio, 1e6); |
285 | 0 | return avg_poll_idle_avg; |
286 | 0 | } |
287 | | |
288 | | static rd_kafka_telemetry_metric_value_t |
289 | | calculate_consumer_commit_latency_avg(rd_kafka_t *rk, |
290 | | rd_kafka_broker_t *rkb_selected, |
291 | 0 | rd_ts_t now_ns) { |
292 | 0 | rd_kafka_telemetry_metric_value_t avg_commit_time; |
293 | 0 | avg_commit_time.double_value = calculate_avg( |
294 | 0 | rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency, |
295 | 0 | THREE_ORDERS_MAGNITUDE); |
296 | 0 | return avg_commit_time; |
297 | 0 | } |
298 | | |
299 | | static rd_kafka_telemetry_metric_value_t |
300 | | calculate_consumer_commit_latency_max(rd_kafka_t *rk, |
301 | | rd_kafka_broker_t *rkb_selected, |
302 | 0 | rd_ts_t now_ns) { |
303 | 0 | rd_kafka_telemetry_metric_value_t max_commit_time; |
304 | 0 | max_commit_time.int_value = calculate_max( |
305 | 0 | rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency, |
306 | 0 | THREE_ORDERS_MAGNITUDE); |
307 | 0 | return max_commit_time; |
308 | 0 | } |
309 | | |
310 | 0 | static void reset_historical_metrics(rd_kafka_t *rk, rd_ts_t now_ns) { |
311 | 0 | rd_kafka_broker_t *rkb; |
312 | |
|
313 | 0 | rk->rk_telemetry.rk_historic_c.ts_last = now_ns; |
314 | 0 | rk->rk_telemetry.rk_historic_c.rebalance_latency_total += |
315 | 0 | RD_CEIL_INTEGER_DIVISION(rk->rk_telemetry.rd_avg_rollover |
316 | 0 | .rk_avg_rebalance_latency.ra_v.sum, |
317 | 0 | THREE_ORDERS_MAGNITUDE); |
318 | |
|
319 | 0 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
320 | 0 | rkb->rkb_telemetry.rkb_historic_c.connects = |
321 | 0 | rd_atomic32_get(&rkb->rkb_c.connects); |
322 | 0 | } |
323 | 0 | } |
324 | | |
325 | | static const rd_kafka_telemetry_metric_value_calculator_t |
326 | | PRODUCER_METRIC_VALUE_CALCULATORS[RD_KAFKA_TELEMETRY_PRODUCER_METRIC__CNT] = |
327 | | { |
328 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_CONNECTION_CREATION_RATE] = |
329 | | &calculate_connection_creation_rate, |
330 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_CONNECTION_CREATION_TOTAL] = |
331 | | &calculate_connection_creation_total, |
332 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_AVG] = |
333 | | &calculate_broker_avg_rtt, |
334 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_MAX] = |
335 | | &calculate_broker_max_rtt, |
336 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_THROTTLE_TIME_AVG] = |
337 | | &calculate_throttle_avg, |
338 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_THROTTLE_TIME_MAX] = |
339 | | &calculate_throttle_max, |
340 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_RECORD_QUEUE_TIME_AVG] = |
341 | | &calculate_queue_time_avg, |
342 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_RECORD_QUEUE_TIME_MAX] = |
343 | | &calculate_queue_time_max, |
344 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_LATENCY_AVG] = |
345 | | &calculate_produce_latency_avg, |
346 | | [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_LATENCY_MAX] = |
347 | | &calculate_produce_latency_max, |
348 | | }; |
349 | | |
350 | | static const rd_kafka_telemetry_metric_value_calculator_t |
351 | | CONSUMER_METRIC_VALUE_CALCULATORS[RD_KAFKA_TELEMETRY_CONSUMER_METRIC__CNT] = { |
352 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_CONNECTION_CREATION_RATE] = |
353 | | &calculate_connection_creation_rate, |
354 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_CONNECTION_CREATION_TOTAL] = |
355 | | &calculate_connection_creation_total, |
356 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_AVG] = |
357 | | &calculate_broker_avg_rtt, |
358 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_MAX] = |
359 | | &calculate_broker_max_rtt, |
360 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_ASSIGNED_PARTITIONS] = |
361 | | &calculate_consumer_assigned_partitions, |
362 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_AVG] = |
363 | | &calculate_consumer_rebalance_latency_avg, |
364 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_MAX] = |
365 | | &calculate_consumer_rebalance_latency_max, |
366 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_TOTAL] = |
367 | | &calculate_consumer_rebalance_latency_total, |
368 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_AVG] = |
369 | | &calculate_consumer_fetch_latency_avg, |
370 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_MAX] = |
371 | | &calculate_consumer_fetch_latency_max, |
372 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG] = |
373 | | &calculate_consumer_poll_idle_ratio_avg, |
374 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_AVG] = |
375 | | &calculate_consumer_commit_latency_avg, |
376 | | [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_MAX] = |
377 | | &calculate_consumer_commit_latency_max, |
378 | | }; |
379 | | |
380 | 0 | static const char *get_client_rack(const rd_kafka_t *rk) { |
381 | 0 | return rk->rk_conf.client_rack && |
382 | 0 | RD_KAFKAP_STR_LEN(rk->rk_conf.client_rack) |
383 | 0 | ? (const char *)rk->rk_conf.client_rack->str |
384 | 0 | : NULL; |
385 | 0 | } |
386 | | |
387 | 0 | static const char *get_group_id(const rd_kafka_t *rk) { |
388 | 0 | return rk->rk_conf.group_id_str ? (const char *)rk->rk_conf.group_id_str |
389 | 0 | : NULL; |
390 | 0 | } |
391 | | |
392 | 0 | static const char *get_group_instance_id(const rd_kafka_t *rk) { |
393 | 0 | return rk->rk_conf.group_instance_id |
394 | 0 | ? (const char *)rk->rk_conf.group_instance_id |
395 | 0 | : NULL; |
396 | 0 | } |
397 | | |
398 | 0 | static const char *get_member_id(const rd_kafka_t *rk) { |
399 | 0 | return rk->rk_cgrp && rk->rk_cgrp->rkcg_member_id && |
400 | 0 | rk->rk_cgrp->rkcg_member_id->len > 0 |
401 | 0 | ? (const char *)rk->rk_cgrp->rkcg_member_id->str |
402 | 0 | : NULL; |
403 | 0 | } |
404 | | |
405 | 0 | static const char *get_transactional_id(const rd_kafka_t *rk) { |
406 | 0 | return rk->rk_conf.eos.transactional_id |
407 | 0 | ? (const char *)rk->rk_conf.eos.transactional_id |
408 | 0 | : NULL; |
409 | 0 | } |
410 | | |
411 | | static const rd_kafka_telemetry_attribute_config_t producer_attributes[] = { |
412 | | {"client_rack", get_client_rack}, |
413 | | {"transactional_id", get_transactional_id}, |
414 | | }; |
415 | | |
416 | | static const rd_kafka_telemetry_attribute_config_t consumer_attributes[] = { |
417 | | {"client_rack", get_client_rack}, |
418 | | {"group_id", get_group_id}, |
419 | | {"group_instance_id", get_group_instance_id}, |
420 | | {"member_id", get_member_id}, |
421 | | }; |
422 | | |
423 | | static int |
424 | | count_attributes(rd_kafka_t *rk, |
425 | | const rd_kafka_telemetry_attribute_config_t *configs, |
426 | 0 | int config_count) { |
427 | 0 | int count = 0, i; |
428 | 0 | for (i = 0; i < config_count; ++i) { |
429 | 0 | if (configs[i].getValue(rk)) { |
430 | 0 | count++; |
431 | 0 | } |
432 | 0 | } |
433 | 0 | return count; |
434 | 0 | } |
435 | | |
436 | | static void set_attributes(rd_kafka_t *rk, |
437 | | rd_kafka_telemetry_resource_attribute_t *attributes, |
438 | | const rd_kafka_telemetry_attribute_config_t *configs, |
439 | 0 | int config_count) { |
440 | 0 | int attr_idx = 0, i; |
441 | 0 | for (i = 0; i < config_count; ++i) { |
442 | 0 | const char *value = configs[i].getValue(rk); |
443 | 0 | if (value) { |
444 | 0 | attributes[attr_idx].name = configs[i].name; |
445 | 0 | attributes[attr_idx].value = value; |
446 | 0 | attr_idx++; |
447 | 0 | } |
448 | 0 | } |
449 | 0 | } |
450 | | |
451 | | static int |
452 | | resource_attributes(rd_kafka_t *rk, |
453 | 0 | rd_kafka_telemetry_resource_attribute_t **attributes) { |
454 | 0 | int count = 0; |
455 | 0 | const rd_kafka_telemetry_attribute_config_t *configs; |
456 | 0 | int config_count; |
457 | |
|
458 | 0 | if (rk->rk_type == RD_KAFKA_PRODUCER) { |
459 | 0 | configs = producer_attributes; |
460 | 0 | config_count = RD_ARRAY_SIZE(producer_attributes); |
461 | 0 | } else if (rk->rk_type == RD_KAFKA_CONSUMER) { |
462 | 0 | configs = consumer_attributes; |
463 | 0 | config_count = RD_ARRAY_SIZE(consumer_attributes); |
464 | 0 | } else { |
465 | 0 | *attributes = NULL; |
466 | 0 | return 0; |
467 | 0 | } |
468 | | |
469 | 0 | count = count_attributes(rk, configs, config_count); |
470 | |
|
471 | 0 | if (count == 0) { |
472 | 0 | *attributes = NULL; |
473 | 0 | return 0; |
474 | 0 | } |
475 | | |
476 | 0 | *attributes = |
477 | 0 | rd_malloc(sizeof(rd_kafka_telemetry_resource_attribute_t) * count); |
478 | |
|
479 | 0 | set_attributes(rk, *attributes, configs, config_count); |
480 | |
|
481 | 0 | return count; |
482 | 0 | } |
483 | | |
484 | | static bool |
485 | 0 | encode_string(pb_ostream_t *stream, const pb_field_t *field, void *const *arg) { |
486 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
487 | 0 | return false; |
488 | 0 | return pb_encode_string(stream, (uint8_t *)(*arg), strlen(*arg)); |
489 | 0 | } |
490 | | |
491 | | // TODO: Update to handle multiple data points. |
492 | | static bool encode_number_data_point(pb_ostream_t *stream, |
493 | | const pb_field_t *field, |
494 | 0 | void *const *arg) { |
495 | 0 | opentelemetry_proto_metrics_v1_NumberDataPoint *data_point = |
496 | 0 | (opentelemetry_proto_metrics_v1_NumberDataPoint *)*arg; |
497 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
498 | 0 | return false; |
499 | | |
500 | 0 | return pb_encode_submessage( |
501 | 0 | stream, opentelemetry_proto_metrics_v1_NumberDataPoint_fields, |
502 | 0 | data_point); |
503 | 0 | } |
504 | | |
505 | | static bool |
506 | 0 | encode_metric(pb_ostream_t *stream, const pb_field_t *field, void *const *arg) { |
507 | 0 | rd_kafka_telemetry_metrics_repeated_t *metricArr = |
508 | 0 | (rd_kafka_telemetry_metrics_repeated_t *)*arg; |
509 | 0 | size_t i; |
510 | |
|
511 | 0 | for (i = 0; i < metricArr->count; i++) { |
512 | |
|
513 | 0 | opentelemetry_proto_metrics_v1_Metric *metric = |
514 | 0 | metricArr->metrics[i]; |
515 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
516 | 0 | return false; |
517 | | |
518 | 0 | if (!pb_encode_submessage( |
519 | 0 | stream, opentelemetry_proto_metrics_v1_Metric_fields, |
520 | 0 | metric)) |
521 | 0 | return false; |
522 | 0 | } |
523 | 0 | return true; |
524 | 0 | } |
525 | | |
526 | | static bool encode_scope_metrics(pb_ostream_t *stream, |
527 | | const pb_field_t *field, |
528 | 0 | void *const *arg) { |
529 | 0 | opentelemetry_proto_metrics_v1_ScopeMetrics *scope_metrics = |
530 | 0 | (opentelemetry_proto_metrics_v1_ScopeMetrics *)*arg; |
531 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
532 | 0 | return false; |
533 | | |
534 | 0 | return pb_encode_submessage( |
535 | 0 | stream, opentelemetry_proto_metrics_v1_ScopeMetrics_fields, |
536 | 0 | scope_metrics); |
537 | 0 | } |
538 | | |
539 | | static bool encode_resource_metrics(pb_ostream_t *stream, |
540 | | const pb_field_t *field, |
541 | 0 | void *const *arg) { |
542 | 0 | opentelemetry_proto_metrics_v1_ResourceMetrics *resource_metrics = |
543 | 0 | (opentelemetry_proto_metrics_v1_ResourceMetrics *)*arg; |
544 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
545 | 0 | return false; |
546 | | |
547 | 0 | return pb_encode_submessage( |
548 | 0 | stream, opentelemetry_proto_metrics_v1_ResourceMetrics_fields, |
549 | 0 | resource_metrics); |
550 | 0 | } |
551 | | |
552 | | static bool encode_key_value(pb_ostream_t *stream, |
553 | | const pb_field_t *field, |
554 | 0 | void *const *arg) { |
555 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
556 | 0 | return false; |
557 | 0 | opentelemetry_proto_common_v1_KeyValue *key_value = |
558 | 0 | (opentelemetry_proto_common_v1_KeyValue *)*arg; |
559 | 0 | return pb_encode_submessage( |
560 | 0 | stream, opentelemetry_proto_common_v1_KeyValue_fields, key_value); |
561 | 0 | } |
562 | | |
563 | | static bool encode_key_values(pb_ostream_t *stream, |
564 | | const pb_field_t *field, |
565 | 0 | void *const *arg) { |
566 | 0 | rd_kafka_telemetry_key_values_repeated_t *kv_arr = |
567 | 0 | (rd_kafka_telemetry_key_values_repeated_t *)*arg; |
568 | 0 | size_t i; |
569 | |
|
570 | 0 | for (i = 0; i < kv_arr->count; i++) { |
571 | |
|
572 | 0 | opentelemetry_proto_common_v1_KeyValue *kv = |
573 | 0 | kv_arr->key_values[i]; |
574 | 0 | if (!pb_encode_tag_for_field(stream, field)) |
575 | 0 | return false; |
576 | | |
577 | 0 | if (!pb_encode_submessage( |
578 | 0 | stream, opentelemetry_proto_common_v1_KeyValue_fields, |
579 | 0 | kv)) |
580 | 0 | return false; |
581 | 0 | } |
582 | 0 | return true; |
583 | 0 | } |
584 | | |
585 | | static void free_metrics( |
586 | | opentelemetry_proto_metrics_v1_Metric **metrics, |
587 | | char **metric_names, |
588 | | opentelemetry_proto_metrics_v1_NumberDataPoint **data_points, |
589 | | opentelemetry_proto_common_v1_KeyValue *datapoint_attributes_key_values, |
590 | 0 | size_t count) { |
591 | 0 | size_t i; |
592 | 0 | for (i = 0; i < count; i++) { |
593 | 0 | rd_free(data_points[i]); |
594 | 0 | rd_free(metric_names[i]); |
595 | 0 | rd_free(metrics[i]); |
596 | 0 | } |
597 | 0 | rd_free(data_points); |
598 | 0 | rd_free(metric_names); |
599 | 0 | rd_free(metrics); |
600 | 0 | rd_free(datapoint_attributes_key_values); |
601 | 0 | } |
602 | | |
603 | | static void free_resource_attributes( |
604 | | opentelemetry_proto_common_v1_KeyValue **resource_attributes_key_values, |
605 | | rd_kafka_telemetry_resource_attribute_t *resource_attributes_struct, |
606 | 0 | size_t count) { |
607 | 0 | size_t i; |
608 | 0 | if (count == 0) |
609 | 0 | return; |
610 | 0 | for (i = 0; i < count; i++) |
611 | 0 | rd_free(resource_attributes_key_values[i]); |
612 | 0 | rd_free(resource_attributes_struct); |
613 | 0 | rd_free(resource_attributes_key_values); |
614 | 0 | } |
615 | | |
616 | | static void serialize_Metric( |
617 | | rd_kafka_t *rk, |
618 | | rd_kafka_broker_t *rkb, |
619 | | const rd_kafka_telemetry_metric_info_t *info, |
620 | | opentelemetry_proto_metrics_v1_Metric **metric, |
621 | | opentelemetry_proto_metrics_v1_NumberDataPoint **data_point, |
622 | | opentelemetry_proto_common_v1_KeyValue *data_point_attribute, |
623 | | rd_kafka_telemetry_metric_value_calculator_t metric_value_calculator, |
624 | | char **metric_name, |
625 | | bool is_per_broker, |
626 | 0 | rd_ts_t now_ns) { |
627 | 0 | rd_ts_t ts_last = rk->rk_telemetry.rk_historic_c.ts_last, |
628 | 0 | ts_start = rk->rk_telemetry.rk_historic_c.ts_start; |
629 | 0 | size_t metric_name_len; |
630 | 0 | if (info->is_int) { |
631 | 0 | (*data_point)->which_value = |
632 | 0 | opentelemetry_proto_metrics_v1_NumberDataPoint_as_int_tag; |
633 | 0 | (*data_point)->value.as_int = |
634 | 0 | metric_value_calculator(rk, rkb, now_ns).int_value; |
635 | 0 | } else { |
636 | 0 | (*data_point)->which_value = |
637 | 0 | opentelemetry_proto_metrics_v1_NumberDataPoint_as_double_tag; |
638 | 0 | (*data_point)->value.as_double = |
639 | 0 | metric_value_calculator(rk, rkb, now_ns).double_value; |
640 | 0 | } |
641 | | |
642 | |
|
643 | 0 | (*data_point)->time_unix_nano = now_ns; |
644 | 0 | if (info->type == RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE || |
645 | 0 | (info->type == RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM && |
646 | 0 | rk->rk_telemetry.delta_temporality)) |
647 | 0 | (*data_point)->start_time_unix_nano = ts_last; |
648 | 0 | else |
649 | 0 | (*data_point)->start_time_unix_nano = ts_start; |
650 | |
|
651 | 0 | if (is_per_broker) { |
652 | 0 | data_point_attribute->key.funcs.encode = &encode_string; |
653 | 0 | data_point_attribute->key.arg = |
654 | 0 | RD_KAFKA_TELEMETRY_METRIC_NODE_ID_ATTRIBUTE; |
655 | 0 | data_point_attribute->has_value = true; |
656 | 0 | data_point_attribute->value.which_value = |
657 | 0 | opentelemetry_proto_common_v1_AnyValue_int_value_tag; |
658 | |
|
659 | 0 | rd_kafka_broker_lock(rkb); |
660 | 0 | data_point_attribute->value.value.int_value = rkb->rkb_nodeid; |
661 | 0 | rd_kafka_broker_unlock(rkb); |
662 | |
|
663 | 0 | (*data_point)->attributes.funcs.encode = &encode_key_value; |
664 | 0 | (*data_point)->attributes.arg = data_point_attribute; |
665 | 0 | } |
666 | | |
667 | |
|
668 | 0 | switch (info->type) { |
669 | | |
670 | 0 | case RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM: { |
671 | 0 | (*metric)->which_data = |
672 | 0 | opentelemetry_proto_metrics_v1_Metric_sum_tag; |
673 | 0 | (*metric)->data.sum.data_points.funcs.encode = |
674 | 0 | &encode_number_data_point; |
675 | 0 | (*metric)->data.sum.data_points.arg = *data_point; |
676 | 0 | (*metric)->data.sum.aggregation_temporality = |
677 | 0 | rk->rk_telemetry.delta_temporality |
678 | 0 | ? opentelemetry_proto_metrics_v1_AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA |
679 | 0 | : opentelemetry_proto_metrics_v1_AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE; |
680 | 0 | (*metric)->data.sum.is_monotonic = true; |
681 | 0 | break; |
682 | 0 | } |
683 | 0 | case RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE: { |
684 | 0 | (*metric)->which_data = |
685 | 0 | opentelemetry_proto_metrics_v1_Metric_gauge_tag; |
686 | 0 | (*metric)->data.gauge.data_points.funcs.encode = |
687 | 0 | &encode_number_data_point; |
688 | 0 | (*metric)->data.gauge.data_points.arg = *data_point; |
689 | 0 | break; |
690 | 0 | } |
691 | 0 | default: |
692 | 0 | rd_assert(!"Unknown metric type"); |
693 | 0 | break; |
694 | 0 | } |
695 | | |
696 | 0 | (*metric)->description.funcs.encode = &encode_string; |
697 | 0 | (*metric)->description.arg = (void *)info->description; |
698 | |
|
699 | 0 | metric_name_len = |
700 | 0 | strlen(RD_KAFKA_TELEMETRY_METRIC_PREFIX) + strlen(info->name) + 1; |
701 | 0 | *metric_name = rd_calloc(1, metric_name_len); |
702 | 0 | rd_snprintf(*metric_name, metric_name_len, "%s%s", |
703 | 0 | RD_KAFKA_TELEMETRY_METRIC_PREFIX, info->name); |
704 | | |
705 | |
|
706 | 0 | (*metric)->name.funcs.encode = &encode_string; |
707 | 0 | (*metric)->name.arg = *metric_name; |
708 | | |
709 | | /* Skipping unit as Java client does the same */ |
710 | 0 | } |
711 | | |
712 | | /** |
713 | | * @brief Encodes the metrics to opentelemetry_proto_metrics_v1_MetricsData and |
714 | | * returns the serialized data. Currently only supports encoding of connection |
715 | | * creation total by default |
716 | | * |
717 | | * @locks none |
718 | | * @locks_acquired rd_kafka_rdlock() |
719 | | * @locality main thread |
720 | | */ |
721 | 0 | rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) { |
722 | 0 | rd_buf_t *rbuf = NULL; |
723 | 0 | rd_kafka_broker_t *rkb; |
724 | 0 | size_t message_size; |
725 | 0 | void *buffer = NULL; |
726 | 0 | pb_ostream_t stream; |
727 | 0 | bool status; |
728 | 0 | char **metric_names; |
729 | 0 | const int *metrics_to_encode = rk->rk_telemetry.matched_metrics; |
730 | 0 | const size_t metrics_to_encode_count = |
731 | 0 | rk->rk_telemetry.matched_metrics_cnt; |
732 | 0 | const rd_kafka_telemetry_metric_info_t *info = |
733 | 0 | RD_KAFKA_TELEMETRY_METRIC_INFO(rk); |
734 | 0 | size_t total_metrics_count = metrics_to_encode_count; |
735 | 0 | size_t i, metric_idx = 0; |
736 | |
|
737 | 0 | if (!metrics_to_encode_count) |
738 | 0 | return rd_buf_new(1, 1); |
739 | | |
740 | 0 | opentelemetry_proto_metrics_v1_MetricsData metrics_data = |
741 | 0 | opentelemetry_proto_metrics_v1_MetricsData_init_zero; |
742 | |
|
743 | 0 | opentelemetry_proto_metrics_v1_ResourceMetrics resource_metrics = |
744 | 0 | opentelemetry_proto_metrics_v1_ResourceMetrics_init_zero; |
745 | |
|
746 | 0 | opentelemetry_proto_metrics_v1_Metric **metrics; |
747 | 0 | opentelemetry_proto_common_v1_KeyValue * |
748 | 0 | *resource_attributes_key_values = NULL; |
749 | 0 | opentelemetry_proto_common_v1_KeyValue |
750 | 0 | *datapoint_attributes_key_values = NULL; |
751 | 0 | opentelemetry_proto_metrics_v1_NumberDataPoint **data_points; |
752 | 0 | rd_kafka_telemetry_metrics_repeated_t metrics_repeated; |
753 | 0 | rd_kafka_telemetry_key_values_repeated_t resource_attributes_repeated; |
754 | 0 | rd_kafka_telemetry_resource_attribute_t *resource_attributes_struct = |
755 | 0 | NULL; |
756 | 0 | rd_ts_t now_ns = rd_uclock() * 1000; |
757 | 0 | rd_kafka_rdlock(rk); |
758 | |
|
759 | 0 | for (i = 0; i < metrics_to_encode_count; i++) { |
760 | 0 | if (info[metrics_to_encode[i]].is_per_broker) { |
761 | 0 | total_metrics_count += rk->rk_broker_cnt.val - 1; |
762 | 0 | } |
763 | 0 | } |
764 | |
|
765 | 0 | rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Serializing metrics"); |
766 | |
|
767 | 0 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
768 | 0 | rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt); |
769 | 0 | rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt, |
770 | 0 | &rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt); |
771 | 0 | rd_avg_destroy( |
772 | 0 | &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency); |
773 | 0 | rd_avg_rollover( |
774 | 0 | &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency, |
775 | 0 | &rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency); |
776 | 0 | rd_avg_destroy( |
777 | 0 | &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle); |
778 | 0 | rd_avg_rollover( |
779 | 0 | &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle, |
780 | 0 | &rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle); |
781 | 0 | if (rk->rk_type == RD_KAFKA_CONSUMER) { |
782 | 0 | rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover |
783 | 0 | .rkb_avg_fetch_latency); |
784 | 0 | rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover |
785 | 0 | .rkb_avg_fetch_latency, |
786 | 0 | &rkb->rkb_telemetry.rd_avg_current |
787 | 0 | .rkb_avg_fetch_latency); |
788 | 0 | } else if (rk->rk_type == RD_KAFKA_PRODUCER) { |
789 | 0 | rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover |
790 | 0 | .rkb_avg_produce_latency); |
791 | 0 | rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover |
792 | 0 | .rkb_avg_produce_latency, |
793 | 0 | &rkb->rkb_telemetry.rd_avg_current |
794 | 0 | .rkb_avg_produce_latency); |
795 | 0 | } |
796 | 0 | } |
797 | |
|
798 | 0 | if (rk->rk_type == RD_KAFKA_CONSUMER) { |
799 | 0 | rd_avg_destroy( |
800 | 0 | &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio); |
801 | 0 | rd_avg_rollover( |
802 | 0 | &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio, |
803 | 0 | &rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio); |
804 | |
|
805 | 0 | rd_avg_destroy( |
806 | 0 | &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency); |
807 | 0 | rd_avg_rollover( |
808 | 0 | &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency, |
809 | 0 | &rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency); |
810 | |
|
811 | 0 | rd_avg_destroy( |
812 | 0 | &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency); |
813 | 0 | rd_avg_rollover( |
814 | 0 | &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency, |
815 | 0 | &rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency); |
816 | 0 | } |
817 | |
|
818 | 0 | int resource_attributes_count = |
819 | 0 | resource_attributes(rk, &resource_attributes_struct); |
820 | 0 | rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Resource attributes count: %d", |
821 | 0 | resource_attributes_count); |
822 | 0 | if (resource_attributes_count > 0) { |
823 | 0 | resource_attributes_key_values = |
824 | 0 | rd_malloc(sizeof(opentelemetry_proto_common_v1_KeyValue *) * |
825 | 0 | resource_attributes_count); |
826 | 0 | int ind; |
827 | 0 | for (ind = 0; ind < resource_attributes_count; ++ind) { |
828 | 0 | resource_attributes_key_values[ind] = rd_calloc( |
829 | 0 | 1, sizeof(opentelemetry_proto_common_v1_KeyValue)); |
830 | 0 | resource_attributes_key_values[ind]->key.funcs.encode = |
831 | 0 | &encode_string; |
832 | 0 | resource_attributes_key_values[ind]->key.arg = |
833 | 0 | (void *)resource_attributes_struct[ind].name; |
834 | |
|
835 | 0 | resource_attributes_key_values[ind]->has_value = true; |
836 | 0 | resource_attributes_key_values[ind]->value.which_value = |
837 | 0 | opentelemetry_proto_common_v1_AnyValue_string_value_tag; |
838 | 0 | resource_attributes_key_values[ind] |
839 | 0 | ->value.value.string_value.funcs.encode = |
840 | 0 | &encode_string; |
841 | 0 | resource_attributes_key_values[ind] |
842 | 0 | ->value.value.string_value.arg = |
843 | 0 | (void *)resource_attributes_struct[ind].value; |
844 | 0 | } |
845 | 0 | resource_attributes_repeated.key_values = |
846 | 0 | resource_attributes_key_values; |
847 | 0 | resource_attributes_repeated.count = resource_attributes_count; |
848 | 0 | resource_metrics.has_resource = true; |
849 | 0 | resource_metrics.resource.attributes.funcs.encode = |
850 | 0 | &encode_key_values; |
851 | 0 | resource_metrics.resource.attributes.arg = |
852 | 0 | &resource_attributes_repeated; |
853 | 0 | } |
854 | |
|
855 | 0 | opentelemetry_proto_metrics_v1_ScopeMetrics scope_metrics = |
856 | 0 | opentelemetry_proto_metrics_v1_ScopeMetrics_init_zero; |
857 | |
|
858 | 0 | opentelemetry_proto_common_v1_InstrumentationScope |
859 | 0 | instrumentation_scope = |
860 | 0 | opentelemetry_proto_common_v1_InstrumentationScope_init_zero; |
861 | 0 | instrumentation_scope.name.funcs.encode = &encode_string; |
862 | 0 | instrumentation_scope.name.arg = (void *)rd_kafka_name(rk); |
863 | 0 | instrumentation_scope.version.funcs.encode = &encode_string; |
864 | 0 | instrumentation_scope.version.arg = (void *)rd_kafka_version_str(); |
865 | |
|
866 | 0 | scope_metrics.has_scope = true; |
867 | 0 | scope_metrics.scope = instrumentation_scope; |
868 | |
|
869 | 0 | metrics = rd_malloc(sizeof(opentelemetry_proto_metrics_v1_Metric *) * |
870 | 0 | total_metrics_count); |
871 | 0 | data_points = |
872 | 0 | rd_malloc(sizeof(opentelemetry_proto_metrics_v1_NumberDataPoint *) * |
873 | 0 | total_metrics_count); |
874 | 0 | datapoint_attributes_key_values = |
875 | 0 | rd_malloc(sizeof(opentelemetry_proto_common_v1_KeyValue) * |
876 | 0 | total_metrics_count); |
877 | 0 | metric_names = rd_malloc(sizeof(char *) * total_metrics_count); |
878 | 0 | rd_kafka_dbg(rk, TELEMETRY, "PUSH", |
879 | 0 | "Total metrics to be encoded count: %" PRIusz, |
880 | 0 | total_metrics_count); |
881 | | |
882 | |
|
883 | 0 | for (i = 0; i < metrics_to_encode_count; i++) { |
884 | |
|
885 | 0 | rd_kafka_telemetry_metric_value_calculator_t |
886 | 0 | metric_value_calculator = |
887 | 0 | (rk->rk_type == RD_KAFKA_PRODUCER) |
888 | 0 | ? PRODUCER_METRIC_VALUE_CALCULATORS |
889 | 0 | [metrics_to_encode[i]] |
890 | 0 | : CONSUMER_METRIC_VALUE_CALCULATORS |
891 | 0 | [metrics_to_encode[i]]; |
892 | 0 | if (info[metrics_to_encode[i]].is_per_broker) { |
893 | 0 | rd_kafka_broker_t *rkb; |
894 | |
|
895 | 0 | TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) { |
896 | 0 | metrics[metric_idx] = rd_calloc( |
897 | 0 | 1, |
898 | 0 | sizeof( |
899 | 0 | opentelemetry_proto_metrics_v1_Metric)); |
900 | 0 | data_points[metric_idx] = rd_calloc( |
901 | 0 | 1, |
902 | 0 | sizeof( |
903 | 0 | opentelemetry_proto_metrics_v1_NumberDataPoint)); |
904 | 0 | serialize_Metric( |
905 | 0 | rk, rkb, &info[metrics_to_encode[i]], |
906 | 0 | &metrics[metric_idx], |
907 | 0 | &data_points[metric_idx], |
908 | 0 | &datapoint_attributes_key_values |
909 | 0 | [metric_idx], |
910 | 0 | metric_value_calculator, |
911 | 0 | &metric_names[metric_idx], true, now_ns); |
912 | 0 | metric_idx++; |
913 | 0 | } |
914 | 0 | continue; |
915 | 0 | } |
916 | | |
917 | 0 | metrics[metric_idx] = |
918 | 0 | rd_calloc(1, sizeof(opentelemetry_proto_metrics_v1_Metric)); |
919 | 0 | data_points[metric_idx] = rd_calloc( |
920 | 0 | 1, sizeof(opentelemetry_proto_metrics_v1_NumberDataPoint)); |
921 | |
|
922 | 0 | serialize_Metric(rk, NULL, &info[metrics_to_encode[i]], |
923 | 0 | &metrics[metric_idx], &data_points[metric_idx], |
924 | 0 | &datapoint_attributes_key_values[metric_idx], |
925 | 0 | metric_value_calculator, |
926 | 0 | &metric_names[metric_idx], false, now_ns); |
927 | 0 | metric_idx++; |
928 | 0 | } |
929 | | |
930 | | /* Send empty metrics blob if no metrics are matched */ |
931 | 0 | if (total_metrics_count > 0) { |
932 | 0 | metrics_repeated.metrics = metrics; |
933 | 0 | metrics_repeated.count = total_metrics_count; |
934 | |
|
935 | 0 | scope_metrics.metrics.funcs.encode = &encode_metric; |
936 | 0 | scope_metrics.metrics.arg = &metrics_repeated; |
937 | | |
938 | |
|
939 | 0 | resource_metrics.scope_metrics.funcs.encode = |
940 | 0 | &encode_scope_metrics; |
941 | 0 | resource_metrics.scope_metrics.arg = &scope_metrics; |
942 | |
|
943 | 0 | metrics_data.resource_metrics.funcs.encode = |
944 | 0 | &encode_resource_metrics; |
945 | 0 | metrics_data.resource_metrics.arg = &resource_metrics; |
946 | 0 | } |
947 | |
|
948 | 0 | status = pb_get_encoded_size( |
949 | 0 | &message_size, opentelemetry_proto_metrics_v1_MetricsData_fields, |
950 | 0 | &metrics_data); |
951 | 0 | if (!status) { |
952 | 0 | rd_kafka_dbg(rk, TELEMETRY, "PUSH", |
953 | 0 | "Failed to get encoded size"); |
954 | 0 | goto fail; |
955 | 0 | } |
956 | | |
957 | 0 | rbuf = rd_buf_new(1, message_size); |
958 | 0 | rd_buf_write_ensure(rbuf, message_size, message_size); |
959 | 0 | message_size = rd_buf_get_writable(rbuf, &buffer); |
960 | |
|
961 | 0 | stream = pb_ostream_from_buffer(buffer, message_size); |
962 | 0 | status = pb_encode(&stream, |
963 | 0 | opentelemetry_proto_metrics_v1_MetricsData_fields, |
964 | 0 | &metrics_data); |
965 | |
|
966 | 0 | if (!status) { |
967 | 0 | rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Encoding failed: %s", |
968 | 0 | PB_GET_ERROR(&stream)); |
969 | 0 | rd_buf_destroy_free(rbuf); |
970 | 0 | goto fail; |
971 | 0 | } |
972 | 0 | rd_kafka_dbg(rk, TELEMETRY, "PUSH", |
973 | 0 | "Push Telemetry metrics encoded, size: %" PRIusz, |
974 | 0 | stream.bytes_written); |
975 | 0 | rd_buf_write(rbuf, NULL, stream.bytes_written); |
976 | |
|
977 | 0 | reset_historical_metrics(rk, now_ns); |
978 | |
|
979 | 0 | free_metrics(metrics, metric_names, data_points, |
980 | 0 | datapoint_attributes_key_values, total_metrics_count); |
981 | 0 | free_resource_attributes(resource_attributes_key_values, |
982 | 0 | resource_attributes_struct, |
983 | 0 | resource_attributes_count); |
984 | 0 | rd_kafka_rdunlock(rk); |
985 | |
|
986 | 0 | return rbuf; |
987 | | |
988 | 0 | fail: |
989 | 0 | free_metrics(metrics, metric_names, data_points, |
990 | 0 | datapoint_attributes_key_values, total_metrics_count); |
991 | 0 | free_resource_attributes(resource_attributes_key_values, |
992 | 0 | resource_attributes_struct, |
993 | 0 | resource_attributes_count); |
994 | 0 | rd_kafka_rdunlock(rk); |
995 | |
|
996 | | return NULL; |
997 | 0 | } |