Coverage Report

Created: 2026-06-20 07:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/lib/librdkafka-2.10.1/src/rdkafka_telemetry_encode.c
Line
Count
Source
1
/*
2
 * librdkafka - Apache Kafka C library
3
 *
4
 * Copyright (c) 2023, Confluent Inc.
5
 * All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions are met:
9
 *
10
 * 1. Redistributions of source code must retain the above copyright notice,
11
 *    this list of conditions and the following disclaimer.
12
 * 2. Redistributions in binary form must reproduce the above copyright notice,
13
 *    this list of conditions and the following disclaimer in the documentation
14
 *    and/or other materials provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26
 * POSSIBILITY OF SUCH DAMAGE.
27
 */
28
29
#include "rdkafka_telemetry_encode.h"
30
#include "nanopb/pb_encode.h"
31
#include "opentelemetry/metrics.pb.h"
32
33
#define THREE_ORDERS_MAGNITUDE 1000
34
35
typedef struct {
36
        opentelemetry_proto_metrics_v1_Metric **metrics;
37
        size_t count;
38
} rd_kafka_telemetry_metrics_repeated_t;
39
40
typedef struct {
41
        opentelemetry_proto_common_v1_KeyValue **key_values;
42
        size_t count;
43
} rd_kafka_telemetry_key_values_repeated_t;
44
45
#define calculate_avg(_avg_, _scale_factor_)                                   \
46
0
        ((_avg_).ra_v.avg / (double)_scale_factor_)
47
48
#define calculate_max(_avg_, _scale_factor_)                                   \
49
0
        RD_CEIL_INTEGER_DIVISION((_avg_).ra_v.maxv, _scale_factor_)
50
51
#define brokers_avg(_rk_, _avg_name_, _scale_factor_, _metric_)                \
52
0
        do {                                                                   \
53
0
                rd_kafka_broker_t *_rkb_;                                      \
54
0
                double avg = 0;                                                \
55
0
                int count  = 0;                                                \
56
0
                TAILQ_FOREACH(_rkb_, &(_rk_)->rk_brokers, rkb_link) {          \
57
0
                        rd_avg_t *rd_avg_rollover =                            \
58
0
                            &_rkb_->rkb_telemetry.rd_avg_rollover._avg_name_;  \
59
0
                        if (rd_avg_rollover->ra_v.cnt) {                       \
60
0
                                avg = (avg * count +                           \
61
0
                                       rd_avg_rollover->ra_v.sum) /            \
62
0
                                      (double)(count +                         \
63
0
                                               rd_avg_rollover->ra_v.cnt);     \
64
0
                                count += rd_avg_rollover->ra_v.cnt;            \
65
0
                        }                                                      \
66
0
                }                                                              \
67
0
                if (_scale_factor_ > 1)                                        \
68
0
                        (_metric_).double_value = avg / _scale_factor_;        \
69
0
                else                                                           \
70
0
                        (_metric_).double_value = avg;                         \
71
0
        } while (0)
72
73
#define brokers_max(_rk_, _avg_name_, _scale_factor_, _metric_)                \
74
0
        do {                                                                   \
75
0
                rd_kafka_broker_t *_rkb_;                                      \
76
0
                _metric_.int_value = 0;                                        \
77
0
                TAILQ_FOREACH(_rkb_, &(_rk_)->rk_brokers, rkb_link) {          \
78
0
                        _metric_.int_value =                                   \
79
0
                            RD_MAX(_metric_.int_value,                         \
80
0
                                   _rkb_->rkb_telemetry.rd_avg_rollover        \
81
0
                                       ._avg_name_.ra_v.maxv);                 \
82
0
                }                                                              \
83
0
                if (_scale_factor_ > 1)                                        \
84
0
                        (_metric_).int_value = RD_CEIL_INTEGER_DIVISION(       \
85
0
                            (_metric_).int_value, _scale_factor_);             \
86
0
        } while (0)
87
88
static rd_kafka_telemetry_metric_value_t
89
calculate_connection_creation_total(rd_kafka_t *rk,
90
                                    rd_kafka_broker_t *rkb_selected,
91
0
                                    rd_ts_t now_ns) {
92
0
        rd_kafka_telemetry_metric_value_t total;
93
0
        rd_kafka_broker_t *rkb;
94
95
0
        total.int_value = 0;
96
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
97
0
                const int32_t connects = rd_atomic32_get(&rkb->rkb_c.connects);
98
0
                if (!rk->rk_telemetry.delta_temporality)
99
0
                        total.int_value += connects;
100
0
                else
101
0
                        total.int_value +=
102
0
                            connects -
103
0
                            rkb->rkb_telemetry.rkb_historic_c.connects;
104
0
        }
105
106
0
        return total;
107
0
}
108
109
static rd_kafka_telemetry_metric_value_t
110
calculate_connection_creation_rate(rd_kafka_t *rk,
111
                                   rd_kafka_broker_t *rkb_selected,
112
0
                                   rd_ts_t now_ns) {
113
0
        rd_kafka_telemetry_metric_value_t total;
114
0
        rd_kafka_broker_t *rkb;
115
0
        rd_ts_t ts_last = rk->rk_telemetry.rk_historic_c.ts_last;
116
117
0
        total.double_value = 0;
118
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
119
0
                total.double_value +=
120
0
                    rd_atomic32_get(&rkb->rkb_c.connects) -
121
0
                    rkb->rkb_telemetry.rkb_historic_c.connects;
122
0
        }
123
0
        double seconds = (now_ns - ts_last) / 1e9;
124
0
        if (seconds > 1.0)
125
0
                total.double_value /= seconds;
126
0
        return total;
127
0
}
128
129
static rd_kafka_telemetry_metric_value_t
130
calculate_broker_avg_rtt(rd_kafka_t *rk,
131
                         rd_kafka_broker_t *rkb_selected,
132
0
                         rd_ts_t now_ns) {
133
0
        rd_kafka_telemetry_metric_value_t avg_rtt = RD_ZERO_INIT;
134
0
        avg_rtt.double_value                      = calculate_avg(
135
0
            rkb_selected->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt,
136
0
            THREE_ORDERS_MAGNITUDE);
137
0
        return avg_rtt;
138
0
}
139
140
static rd_kafka_telemetry_metric_value_t
141
calculate_broker_max_rtt(rd_kafka_t *rk,
142
                         rd_kafka_broker_t *rkb_selected,
143
0
                         rd_ts_t now_ns) {
144
0
        rd_kafka_telemetry_metric_value_t max_rtt = RD_ZERO_INIT;
145
0
        max_rtt.int_value                         = calculate_max(
146
0
            rkb_selected->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt,
147
0
            THREE_ORDERS_MAGNITUDE);
148
0
        return max_rtt;
149
0
}
150
151
static rd_kafka_telemetry_metric_value_t
152
calculate_produce_latency_avg(rd_kafka_t *rk,
153
                              rd_kafka_broker_t *rkb_selected,
154
0
                              rd_ts_t now_ns) {
155
0
        rd_kafka_telemetry_metric_value_t avg_rtt = RD_ZERO_INIT;
156
0
        brokers_avg(rk, rkb_avg_produce_latency, THREE_ORDERS_MAGNITUDE,
157
0
                    avg_rtt);
158
0
        return avg_rtt;
159
0
}
160
161
static rd_kafka_telemetry_metric_value_t
162
calculate_produce_latency_max(rd_kafka_t *rk,
163
                              rd_kafka_broker_t *rkb_selected,
164
0
                              rd_ts_t now_ns) {
165
0
        rd_kafka_telemetry_metric_value_t max_rtt = RD_ZERO_INIT;
166
0
        brokers_max(rk, rkb_avg_produce_latency, THREE_ORDERS_MAGNITUDE,
167
0
                    max_rtt);
168
0
        return max_rtt;
169
0
}
170
171
static rd_kafka_telemetry_metric_value_t
172
calculate_throttle_avg(rd_kafka_t *rk,
173
                       rd_kafka_broker_t *rkb_selected,
174
0
                       rd_ts_t now_ns) {
175
0
        rd_kafka_telemetry_metric_value_t avg_throttle;
176
0
        brokers_avg(rk, rkb_avg_throttle, 1, avg_throttle);
177
0
        return avg_throttle;
178
0
}
179
180
181
static rd_kafka_telemetry_metric_value_t
182
calculate_throttle_max(rd_kafka_t *rk,
183
                       rd_kafka_broker_t *rkb_selected,
184
0
                       rd_ts_t now_ns) {
185
0
        rd_kafka_telemetry_metric_value_t max_throttle;
186
0
        brokers_max(rk, rkb_avg_throttle, 1, max_throttle);
187
0
        return max_throttle;
188
0
}
189
190
static rd_kafka_telemetry_metric_value_t
191
calculate_queue_time_avg(rd_kafka_t *rk,
192
                         rd_kafka_broker_t *rkb_selected,
193
0
                         rd_ts_t now_ns) {
194
0
        rd_kafka_telemetry_metric_value_t avg_queue_time;
195
0
        brokers_avg(rk, rkb_avg_outbuf_latency, THREE_ORDERS_MAGNITUDE,
196
0
                    avg_queue_time);
197
0
        return avg_queue_time;
198
0
}
199
200
static rd_kafka_telemetry_metric_value_t
201
calculate_queue_time_max(rd_kafka_t *rk,
202
                         rd_kafka_broker_t *rkb_selected,
203
0
                         rd_ts_t now_ns) {
204
0
        rd_kafka_telemetry_metric_value_t max_queue_time;
205
0
        brokers_max(rk, rkb_avg_outbuf_latency, THREE_ORDERS_MAGNITUDE,
206
0
                    max_queue_time);
207
0
        return max_queue_time;
208
0
}
209
210
static rd_kafka_telemetry_metric_value_t
211
calculate_consumer_assigned_partitions(rd_kafka_t *rk,
212
                                       rd_kafka_broker_t *rkb_selected,
213
0
                                       rd_ts_t now_ns) {
214
0
        rd_kafka_telemetry_metric_value_t assigned_partitions;
215
216
0
        assigned_partitions.int_value =
217
0
            rk->rk_cgrp ? rk->rk_cgrp->rkcg_c.assignment_size : 0;
218
0
        return assigned_partitions;
219
0
}
220
221
static rd_kafka_telemetry_metric_value_t
222
calculate_consumer_rebalance_latency_avg(rd_kafka_t *rk,
223
                                         rd_kafka_broker_t *rkb_selected,
224
0
                                         rd_ts_t now_ns) {
225
0
        rd_kafka_telemetry_metric_value_t avg_rebalance_time;
226
0
        avg_rebalance_time.double_value = calculate_avg(
227
0
            rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency,
228
0
            THREE_ORDERS_MAGNITUDE);
229
0
        return avg_rebalance_time;
230
0
}
231
232
static rd_kafka_telemetry_metric_value_t
233
calculate_consumer_rebalance_latency_max(rd_kafka_t *rk,
234
                                         rd_kafka_broker_t *rkb_selected,
235
0
                                         rd_ts_t now_ns) {
236
0
        rd_kafka_telemetry_metric_value_t max_rebalance_time;
237
0
        max_rebalance_time.int_value = calculate_max(
238
0
            rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency,
239
0
            THREE_ORDERS_MAGNITUDE);
240
0
        return max_rebalance_time;
241
0
}
242
243
static rd_kafka_telemetry_metric_value_t
244
calculate_consumer_rebalance_latency_total(rd_kafka_t *rk,
245
                                           rd_kafka_broker_t *rkb_selected,
246
0
                                           rd_ts_t now_ns) {
247
0
        rd_kafka_telemetry_metric_value_t total_rebalance_time;
248
0
        total_rebalance_time.int_value = RD_CEIL_INTEGER_DIVISION(
249
0
            rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency.ra_v.sum,
250
0
            THREE_ORDERS_MAGNITUDE);
251
0
        if (!rk->rk_telemetry.delta_temporality) {
252
0
                total_rebalance_time.int_value +=
253
0
                    rk->rk_telemetry.rk_historic_c.rebalance_latency_total;
254
0
        }
255
0
        return total_rebalance_time;
256
0
}
257
258
static rd_kafka_telemetry_metric_value_t
259
calculate_consumer_fetch_latency_avg(rd_kafka_t *rk,
260
                                     rd_kafka_broker_t *rkb_selected,
261
0
                                     rd_ts_t now_ns) {
262
0
        rd_kafka_telemetry_metric_value_t avg_fetch_time;
263
0
        brokers_avg(rk, rkb_avg_fetch_latency, THREE_ORDERS_MAGNITUDE,
264
0
                    avg_fetch_time);
265
0
        return avg_fetch_time;
266
0
}
267
268
static rd_kafka_telemetry_metric_value_t
269
calculate_consumer_fetch_latency_max(rd_kafka_t *rk,
270
                                     rd_kafka_broker_t *rkb_selected,
271
0
                                     rd_ts_t now_ns) {
272
0
        rd_kafka_telemetry_metric_value_t max_fetch_time;
273
0
        brokers_max(rk, rkb_avg_fetch_latency, THREE_ORDERS_MAGNITUDE,
274
0
                    max_fetch_time);
275
0
        return max_fetch_time;
276
0
}
277
278
static rd_kafka_telemetry_metric_value_t
279
calculate_consumer_poll_idle_ratio_avg(rd_kafka_t *rk,
280
                                       rd_kafka_broker_t *rkb_selected,
281
0
                                       rd_ts_t now_ns) {
282
0
        rd_kafka_telemetry_metric_value_t avg_poll_idle_avg;
283
0
        avg_poll_idle_avg.double_value = calculate_avg(
284
0
            rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio, 1e6);
285
0
        return avg_poll_idle_avg;
286
0
}
287
288
static rd_kafka_telemetry_metric_value_t
289
calculate_consumer_commit_latency_avg(rd_kafka_t *rk,
290
                                      rd_kafka_broker_t *rkb_selected,
291
0
                                      rd_ts_t now_ns) {
292
0
        rd_kafka_telemetry_metric_value_t avg_commit_time;
293
0
        avg_commit_time.double_value = calculate_avg(
294
0
            rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency,
295
0
            THREE_ORDERS_MAGNITUDE);
296
0
        return avg_commit_time;
297
0
}
298
299
static rd_kafka_telemetry_metric_value_t
300
calculate_consumer_commit_latency_max(rd_kafka_t *rk,
301
                                      rd_kafka_broker_t *rkb_selected,
302
0
                                      rd_ts_t now_ns) {
303
0
        rd_kafka_telemetry_metric_value_t max_commit_time;
304
0
        max_commit_time.int_value = calculate_max(
305
0
            rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency,
306
0
            THREE_ORDERS_MAGNITUDE);
307
0
        return max_commit_time;
308
0
}
309
310
0
static void reset_historical_metrics(rd_kafka_t *rk, rd_ts_t now_ns) {
311
0
        rd_kafka_broker_t *rkb;
312
313
0
        rk->rk_telemetry.rk_historic_c.ts_last = now_ns;
314
0
        rk->rk_telemetry.rk_historic_c.rebalance_latency_total +=
315
0
            RD_CEIL_INTEGER_DIVISION(rk->rk_telemetry.rd_avg_rollover
316
0
                                         .rk_avg_rebalance_latency.ra_v.sum,
317
0
                                     THREE_ORDERS_MAGNITUDE);
318
319
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
320
0
                rkb->rkb_telemetry.rkb_historic_c.connects =
321
0
                    rd_atomic32_get(&rkb->rkb_c.connects);
322
0
        }
323
0
}
324
325
static const rd_kafka_telemetry_metric_value_calculator_t
326
    PRODUCER_METRIC_VALUE_CALCULATORS[RD_KAFKA_TELEMETRY_PRODUCER_METRIC__CNT] =
327
        {
328
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_CONNECTION_CREATION_RATE] =
329
                &calculate_connection_creation_rate,
330
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_CONNECTION_CREATION_TOTAL] =
331
                &calculate_connection_creation_total,
332
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_AVG] =
333
                &calculate_broker_avg_rtt,
334
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_NODE_REQUEST_LATENCY_MAX] =
335
                &calculate_broker_max_rtt,
336
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_THROTTLE_TIME_AVG] =
337
                &calculate_throttle_avg,
338
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_THROTTLE_TIME_MAX] =
339
                &calculate_throttle_max,
340
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_RECORD_QUEUE_TIME_AVG] =
341
                &calculate_queue_time_avg,
342
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_RECORD_QUEUE_TIME_MAX] =
343
                &calculate_queue_time_max,
344
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_LATENCY_AVG] =
345
                &calculate_produce_latency_avg,
346
            [RD_KAFKA_TELEMETRY_METRIC_PRODUCER_PRODUCE_LATENCY_MAX] =
347
                &calculate_produce_latency_max,
348
};
349
350
static const rd_kafka_telemetry_metric_value_calculator_t
351
    CONSUMER_METRIC_VALUE_CALCULATORS[RD_KAFKA_TELEMETRY_CONSUMER_METRIC__CNT] = {
352
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_CONNECTION_CREATION_RATE] =
353
            &calculate_connection_creation_rate,
354
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_CONNECTION_CREATION_TOTAL] =
355
            &calculate_connection_creation_total,
356
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_AVG] =
357
            &calculate_broker_avg_rtt,
358
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_NODE_REQUEST_LATENCY_MAX] =
359
            &calculate_broker_max_rtt,
360
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_ASSIGNED_PARTITIONS] =
361
            &calculate_consumer_assigned_partitions,
362
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_AVG] =
363
            &calculate_consumer_rebalance_latency_avg,
364
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_MAX] =
365
            &calculate_consumer_rebalance_latency_max,
366
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_REBALANCE_LATENCY_TOTAL] =
367
            &calculate_consumer_rebalance_latency_total,
368
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_AVG] =
369
            &calculate_consumer_fetch_latency_avg,
370
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_FETCH_MANAGER_FETCH_LATENCY_MAX] =
371
            &calculate_consumer_fetch_latency_max,
372
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_POLL_IDLE_RATIO_AVG] =
373
            &calculate_consumer_poll_idle_ratio_avg,
374
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_AVG] =
375
            &calculate_consumer_commit_latency_avg,
376
        [RD_KAFKA_TELEMETRY_METRIC_CONSUMER_COORDINATOR_COMMIT_LATENCY_MAX] =
377
            &calculate_consumer_commit_latency_max,
378
};
379
380
0
static const char *get_client_rack(const rd_kafka_t *rk) {
381
0
        return rk->rk_conf.client_rack &&
382
0
                       RD_KAFKAP_STR_LEN(rk->rk_conf.client_rack)
383
0
                   ? (const char *)rk->rk_conf.client_rack->str
384
0
                   : NULL;
385
0
}
386
387
0
static const char *get_group_id(const rd_kafka_t *rk) {
388
0
        return rk->rk_conf.group_id_str ? (const char *)rk->rk_conf.group_id_str
389
0
                                        : NULL;
390
0
}
391
392
0
static const char *get_group_instance_id(const rd_kafka_t *rk) {
393
0
        return rk->rk_conf.group_instance_id
394
0
                   ? (const char *)rk->rk_conf.group_instance_id
395
0
                   : NULL;
396
0
}
397
398
0
static const char *get_member_id(const rd_kafka_t *rk) {
399
0
        return rk->rk_cgrp && rk->rk_cgrp->rkcg_member_id &&
400
0
                       rk->rk_cgrp->rkcg_member_id->len > 0
401
0
                   ? (const char *)rk->rk_cgrp->rkcg_member_id->str
402
0
                   : NULL;
403
0
}
404
405
0
static const char *get_transactional_id(const rd_kafka_t *rk) {
406
0
        return rk->rk_conf.eos.transactional_id
407
0
                   ? (const char *)rk->rk_conf.eos.transactional_id
408
0
                   : NULL;
409
0
}
410
411
static const rd_kafka_telemetry_attribute_config_t producer_attributes[] = {
412
    {"client_rack", get_client_rack},
413
    {"transactional_id", get_transactional_id},
414
};
415
416
static const rd_kafka_telemetry_attribute_config_t consumer_attributes[] = {
417
    {"client_rack", get_client_rack},
418
    {"group_id", get_group_id},
419
    {"group_instance_id", get_group_instance_id},
420
    {"member_id", get_member_id},
421
};
422
423
static int
424
count_attributes(rd_kafka_t *rk,
425
                 const rd_kafka_telemetry_attribute_config_t *configs,
426
0
                 int config_count) {
427
0
        int count = 0, i;
428
0
        for (i = 0; i < config_count; ++i) {
429
0
                if (configs[i].getValue(rk)) {
430
0
                        count++;
431
0
                }
432
0
        }
433
0
        return count;
434
0
}
435
436
static void set_attributes(rd_kafka_t *rk,
437
                           rd_kafka_telemetry_resource_attribute_t *attributes,
438
                           const rd_kafka_telemetry_attribute_config_t *configs,
439
0
                           int config_count) {
440
0
        int attr_idx = 0, i;
441
0
        for (i = 0; i < config_count; ++i) {
442
0
                const char *value = configs[i].getValue(rk);
443
0
                if (value) {
444
0
                        attributes[attr_idx].name  = configs[i].name;
445
0
                        attributes[attr_idx].value = value;
446
0
                        attr_idx++;
447
0
                }
448
0
        }
449
0
}
450
451
static int
452
resource_attributes(rd_kafka_t *rk,
453
0
                    rd_kafka_telemetry_resource_attribute_t **attributes) {
454
0
        int count = 0;
455
0
        const rd_kafka_telemetry_attribute_config_t *configs;
456
0
        int config_count;
457
458
0
        if (rk->rk_type == RD_KAFKA_PRODUCER) {
459
0
                configs      = producer_attributes;
460
0
                config_count = RD_ARRAY_SIZE(producer_attributes);
461
0
        } else if (rk->rk_type == RD_KAFKA_CONSUMER) {
462
0
                configs      = consumer_attributes;
463
0
                config_count = RD_ARRAY_SIZE(consumer_attributes);
464
0
        } else {
465
0
                *attributes = NULL;
466
0
                return 0;
467
0
        }
468
469
0
        count = count_attributes(rk, configs, config_count);
470
471
0
        if (count == 0) {
472
0
                *attributes = NULL;
473
0
                return 0;
474
0
        }
475
476
0
        *attributes =
477
0
            rd_malloc(sizeof(rd_kafka_telemetry_resource_attribute_t) * count);
478
479
0
        set_attributes(rk, *attributes, configs, config_count);
480
481
0
        return count;
482
0
}
483
484
static bool
485
0
encode_string(pb_ostream_t *stream, const pb_field_t *field, void *const *arg) {
486
0
        if (!pb_encode_tag_for_field(stream, field))
487
0
                return false;
488
0
        return pb_encode_string(stream, (uint8_t *)(*arg), strlen(*arg));
489
0
}
490
491
// TODO: Update to handle multiple data points.
492
static bool encode_number_data_point(pb_ostream_t *stream,
493
                                     const pb_field_t *field,
494
0
                                     void *const *arg) {
495
0
        opentelemetry_proto_metrics_v1_NumberDataPoint *data_point =
496
0
            (opentelemetry_proto_metrics_v1_NumberDataPoint *)*arg;
497
0
        if (!pb_encode_tag_for_field(stream, field))
498
0
                return false;
499
500
0
        return pb_encode_submessage(
501
0
            stream, opentelemetry_proto_metrics_v1_NumberDataPoint_fields,
502
0
            data_point);
503
0
}
504
505
static bool
506
0
encode_metric(pb_ostream_t *stream, const pb_field_t *field, void *const *arg) {
507
0
        rd_kafka_telemetry_metrics_repeated_t *metricArr =
508
0
            (rd_kafka_telemetry_metrics_repeated_t *)*arg;
509
0
        size_t i;
510
511
0
        for (i = 0; i < metricArr->count; i++) {
512
513
0
                opentelemetry_proto_metrics_v1_Metric *metric =
514
0
                    metricArr->metrics[i];
515
0
                if (!pb_encode_tag_for_field(stream, field))
516
0
                        return false;
517
518
0
                if (!pb_encode_submessage(
519
0
                        stream, opentelemetry_proto_metrics_v1_Metric_fields,
520
0
                        metric))
521
0
                        return false;
522
0
        }
523
0
        return true;
524
0
}
525
526
static bool encode_scope_metrics(pb_ostream_t *stream,
527
                                 const pb_field_t *field,
528
0
                                 void *const *arg) {
529
0
        opentelemetry_proto_metrics_v1_ScopeMetrics *scope_metrics =
530
0
            (opentelemetry_proto_metrics_v1_ScopeMetrics *)*arg;
531
0
        if (!pb_encode_tag_for_field(stream, field))
532
0
                return false;
533
534
0
        return pb_encode_submessage(
535
0
            stream, opentelemetry_proto_metrics_v1_ScopeMetrics_fields,
536
0
            scope_metrics);
537
0
}
538
539
static bool encode_resource_metrics(pb_ostream_t *stream,
540
                                    const pb_field_t *field,
541
0
                                    void *const *arg) {
542
0
        opentelemetry_proto_metrics_v1_ResourceMetrics *resource_metrics =
543
0
            (opentelemetry_proto_metrics_v1_ResourceMetrics *)*arg;
544
0
        if (!pb_encode_tag_for_field(stream, field))
545
0
                return false;
546
547
0
        return pb_encode_submessage(
548
0
            stream, opentelemetry_proto_metrics_v1_ResourceMetrics_fields,
549
0
            resource_metrics);
550
0
}
551
552
static bool encode_key_value(pb_ostream_t *stream,
553
                             const pb_field_t *field,
554
0
                             void *const *arg) {
555
0
        if (!pb_encode_tag_for_field(stream, field))
556
0
                return false;
557
0
        opentelemetry_proto_common_v1_KeyValue *key_value =
558
0
            (opentelemetry_proto_common_v1_KeyValue *)*arg;
559
0
        return pb_encode_submessage(
560
0
            stream, opentelemetry_proto_common_v1_KeyValue_fields, key_value);
561
0
}
562
563
static bool encode_key_values(pb_ostream_t *stream,
564
                              const pb_field_t *field,
565
0
                              void *const *arg) {
566
0
        rd_kafka_telemetry_key_values_repeated_t *kv_arr =
567
0
            (rd_kafka_telemetry_key_values_repeated_t *)*arg;
568
0
        size_t i;
569
570
0
        for (i = 0; i < kv_arr->count; i++) {
571
572
0
                opentelemetry_proto_common_v1_KeyValue *kv =
573
0
                    kv_arr->key_values[i];
574
0
                if (!pb_encode_tag_for_field(stream, field))
575
0
                        return false;
576
577
0
                if (!pb_encode_submessage(
578
0
                        stream, opentelemetry_proto_common_v1_KeyValue_fields,
579
0
                        kv))
580
0
                        return false;
581
0
        }
582
0
        return true;
583
0
}
584
585
static void free_metrics(
586
    opentelemetry_proto_metrics_v1_Metric **metrics,
587
    char **metric_names,
588
    opentelemetry_proto_metrics_v1_NumberDataPoint **data_points,
589
    opentelemetry_proto_common_v1_KeyValue *datapoint_attributes_key_values,
590
0
    size_t count) {
591
0
        size_t i;
592
0
        for (i = 0; i < count; i++) {
593
0
                rd_free(data_points[i]);
594
0
                rd_free(metric_names[i]);
595
0
                rd_free(metrics[i]);
596
0
        }
597
0
        rd_free(data_points);
598
0
        rd_free(metric_names);
599
0
        rd_free(metrics);
600
0
        rd_free(datapoint_attributes_key_values);
601
0
}
602
603
static void free_resource_attributes(
604
    opentelemetry_proto_common_v1_KeyValue **resource_attributes_key_values,
605
    rd_kafka_telemetry_resource_attribute_t *resource_attributes_struct,
606
0
    size_t count) {
607
0
        size_t i;
608
0
        if (count == 0)
609
0
                return;
610
0
        for (i = 0; i < count; i++)
611
0
                rd_free(resource_attributes_key_values[i]);
612
0
        rd_free(resource_attributes_struct);
613
0
        rd_free(resource_attributes_key_values);
614
0
}
615
616
static void serialize_Metric(
617
    rd_kafka_t *rk,
618
    rd_kafka_broker_t *rkb,
619
    const rd_kafka_telemetry_metric_info_t *info,
620
    opentelemetry_proto_metrics_v1_Metric **metric,
621
    opentelemetry_proto_metrics_v1_NumberDataPoint **data_point,
622
    opentelemetry_proto_common_v1_KeyValue *data_point_attribute,
623
    rd_kafka_telemetry_metric_value_calculator_t metric_value_calculator,
624
    char **metric_name,
625
    bool is_per_broker,
626
0
    rd_ts_t now_ns) {
627
0
        rd_ts_t ts_last  = rk->rk_telemetry.rk_historic_c.ts_last,
628
0
                ts_start = rk->rk_telemetry.rk_historic_c.ts_start;
629
0
        size_t metric_name_len;
630
0
        if (info->is_int) {
631
0
                (*data_point)->which_value =
632
0
                    opentelemetry_proto_metrics_v1_NumberDataPoint_as_int_tag;
633
0
                (*data_point)->value.as_int =
634
0
                    metric_value_calculator(rk, rkb, now_ns).int_value;
635
0
        } else {
636
0
                (*data_point)->which_value =
637
0
                    opentelemetry_proto_metrics_v1_NumberDataPoint_as_double_tag;
638
0
                (*data_point)->value.as_double =
639
0
                    metric_value_calculator(rk, rkb, now_ns).double_value;
640
0
        }
641
642
643
0
        (*data_point)->time_unix_nano = now_ns;
644
0
        if (info->type == RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE ||
645
0
            (info->type == RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM &&
646
0
             rk->rk_telemetry.delta_temporality))
647
0
                (*data_point)->start_time_unix_nano = ts_last;
648
0
        else
649
0
                (*data_point)->start_time_unix_nano = ts_start;
650
651
0
        if (is_per_broker) {
652
0
                data_point_attribute->key.funcs.encode = &encode_string;
653
0
                data_point_attribute->key.arg =
654
0
                    RD_KAFKA_TELEMETRY_METRIC_NODE_ID_ATTRIBUTE;
655
0
                data_point_attribute->has_value = true;
656
0
                data_point_attribute->value.which_value =
657
0
                    opentelemetry_proto_common_v1_AnyValue_int_value_tag;
658
659
0
                rd_kafka_broker_lock(rkb);
660
0
                data_point_attribute->value.value.int_value = rkb->rkb_nodeid;
661
0
                rd_kafka_broker_unlock(rkb);
662
663
0
                (*data_point)->attributes.funcs.encode = &encode_key_value;
664
0
                (*data_point)->attributes.arg          = data_point_attribute;
665
0
        }
666
667
668
0
        switch (info->type) {
669
670
0
        case RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM: {
671
0
                (*metric)->which_data =
672
0
                    opentelemetry_proto_metrics_v1_Metric_sum_tag;
673
0
                (*metric)->data.sum.data_points.funcs.encode =
674
0
                    &encode_number_data_point;
675
0
                (*metric)->data.sum.data_points.arg = *data_point;
676
0
                (*metric)->data.sum.aggregation_temporality =
677
0
                    rk->rk_telemetry.delta_temporality
678
0
                        ? opentelemetry_proto_metrics_v1_AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA
679
0
                        : opentelemetry_proto_metrics_v1_AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE;
680
0
                (*metric)->data.sum.is_monotonic = true;
681
0
                break;
682
0
        }
683
0
        case RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE: {
684
0
                (*metric)->which_data =
685
0
                    opentelemetry_proto_metrics_v1_Metric_gauge_tag;
686
0
                (*metric)->data.gauge.data_points.funcs.encode =
687
0
                    &encode_number_data_point;
688
0
                (*metric)->data.gauge.data_points.arg = *data_point;
689
0
                break;
690
0
        }
691
0
        default:
692
0
                rd_assert(!"Unknown metric type");
693
0
                break;
694
0
        }
695
696
0
        (*metric)->description.funcs.encode = &encode_string;
697
0
        (*metric)->description.arg          = (void *)info->description;
698
699
0
        metric_name_len =
700
0
            strlen(RD_KAFKA_TELEMETRY_METRIC_PREFIX) + strlen(info->name) + 1;
701
0
        *metric_name = rd_calloc(1, metric_name_len);
702
0
        rd_snprintf(*metric_name, metric_name_len, "%s%s",
703
0
                    RD_KAFKA_TELEMETRY_METRIC_PREFIX, info->name);
704
705
706
0
        (*metric)->name.funcs.encode = &encode_string;
707
0
        (*metric)->name.arg          = *metric_name;
708
709
        /* Skipping unit as Java client does the same */
710
0
}
711
712
/**
713
 * @brief Encodes the metrics to opentelemetry_proto_metrics_v1_MetricsData and
714
 * returns the serialized data. Currently only supports encoding of connection
715
 * creation total by default
716
 *
717
 * @locks none
718
 * @locks_acquired rd_kafka_rdlock()
719
 * @locality main thread
720
 */
721
0
rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) {
722
0
        rd_buf_t *rbuf = NULL;
723
0
        rd_kafka_broker_t *rkb;
724
0
        size_t message_size;
725
0
        void *buffer = NULL;
726
0
        pb_ostream_t stream;
727
0
        bool status;
728
0
        char **metric_names;
729
0
        const int *metrics_to_encode = rk->rk_telemetry.matched_metrics;
730
0
        const size_t metrics_to_encode_count =
731
0
            rk->rk_telemetry.matched_metrics_cnt;
732
0
        const rd_kafka_telemetry_metric_info_t *info =
733
0
            RD_KAFKA_TELEMETRY_METRIC_INFO(rk);
734
0
        size_t total_metrics_count = metrics_to_encode_count;
735
0
        size_t i, metric_idx = 0;
736
737
0
        if (!metrics_to_encode_count)
738
0
                return rd_buf_new(1, 1);
739
740
0
        opentelemetry_proto_metrics_v1_MetricsData metrics_data =
741
0
            opentelemetry_proto_metrics_v1_MetricsData_init_zero;
742
743
0
        opentelemetry_proto_metrics_v1_ResourceMetrics resource_metrics =
744
0
            opentelemetry_proto_metrics_v1_ResourceMetrics_init_zero;
745
746
0
        opentelemetry_proto_metrics_v1_Metric **metrics;
747
0
        opentelemetry_proto_common_v1_KeyValue *
748
0
            *resource_attributes_key_values = NULL;
749
0
        opentelemetry_proto_common_v1_KeyValue
750
0
            *datapoint_attributes_key_values = NULL;
751
0
        opentelemetry_proto_metrics_v1_NumberDataPoint **data_points;
752
0
        rd_kafka_telemetry_metrics_repeated_t metrics_repeated;
753
0
        rd_kafka_telemetry_key_values_repeated_t resource_attributes_repeated;
754
0
        rd_kafka_telemetry_resource_attribute_t *resource_attributes_struct =
755
0
            NULL;
756
0
        rd_ts_t now_ns = rd_uclock() * 1000;
757
0
        rd_kafka_rdlock(rk);
758
759
0
        for (i = 0; i < metrics_to_encode_count; i++) {
760
0
                if (info[metrics_to_encode[i]].is_per_broker) {
761
0
                        total_metrics_count += rk->rk_broker_cnt.val - 1;
762
0
                }
763
0
        }
764
765
0
        rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Serializing metrics");
766
767
0
        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
768
0
                rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt);
769
0
                rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_rtt,
770
0
                                &rkb->rkb_telemetry.rd_avg_current.rkb_avg_rtt);
771
0
                rd_avg_destroy(
772
0
                    &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency);
773
0
                rd_avg_rollover(
774
0
                    &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_outbuf_latency,
775
0
                    &rkb->rkb_telemetry.rd_avg_current.rkb_avg_outbuf_latency);
776
0
                rd_avg_destroy(
777
0
                    &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle);
778
0
                rd_avg_rollover(
779
0
                    &rkb->rkb_telemetry.rd_avg_rollover.rkb_avg_throttle,
780
0
                    &rkb->rkb_telemetry.rd_avg_current.rkb_avg_throttle);
781
0
                if (rk->rk_type == RD_KAFKA_CONSUMER) {
782
0
                        rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover
783
0
                                            .rkb_avg_fetch_latency);
784
0
                        rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover
785
0
                                             .rkb_avg_fetch_latency,
786
0
                                        &rkb->rkb_telemetry.rd_avg_current
787
0
                                             .rkb_avg_fetch_latency);
788
0
                } else if (rk->rk_type == RD_KAFKA_PRODUCER) {
789
0
                        rd_avg_destroy(&rkb->rkb_telemetry.rd_avg_rollover
790
0
                                            .rkb_avg_produce_latency);
791
0
                        rd_avg_rollover(&rkb->rkb_telemetry.rd_avg_rollover
792
0
                                             .rkb_avg_produce_latency,
793
0
                                        &rkb->rkb_telemetry.rd_avg_current
794
0
                                             .rkb_avg_produce_latency);
795
0
                }
796
0
        }
797
798
0
        if (rk->rk_type == RD_KAFKA_CONSUMER) {
799
0
                rd_avg_destroy(
800
0
                    &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio);
801
0
                rd_avg_rollover(
802
0
                    &rk->rk_telemetry.rd_avg_rollover.rk_avg_poll_idle_ratio,
803
0
                    &rk->rk_telemetry.rd_avg_current.rk_avg_poll_idle_ratio);
804
805
0
                rd_avg_destroy(
806
0
                    &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency);
807
0
                rd_avg_rollover(
808
0
                    &rk->rk_telemetry.rd_avg_rollover.rk_avg_rebalance_latency,
809
0
                    &rk->rk_telemetry.rd_avg_current.rk_avg_rebalance_latency);
810
811
0
                rd_avg_destroy(
812
0
                    &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency);
813
0
                rd_avg_rollover(
814
0
                    &rk->rk_telemetry.rd_avg_rollover.rk_avg_commit_latency,
815
0
                    &rk->rk_telemetry.rd_avg_current.rk_avg_commit_latency);
816
0
        }
817
818
0
        int resource_attributes_count =
819
0
            resource_attributes(rk, &resource_attributes_struct);
820
0
        rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Resource attributes count: %d",
821
0
                     resource_attributes_count);
822
0
        if (resource_attributes_count > 0) {
823
0
                resource_attributes_key_values =
824
0
                    rd_malloc(sizeof(opentelemetry_proto_common_v1_KeyValue *) *
825
0
                              resource_attributes_count);
826
0
                int ind;
827
0
                for (ind = 0; ind < resource_attributes_count; ++ind) {
828
0
                        resource_attributes_key_values[ind] = rd_calloc(
829
0
                            1, sizeof(opentelemetry_proto_common_v1_KeyValue));
830
0
                        resource_attributes_key_values[ind]->key.funcs.encode =
831
0
                            &encode_string;
832
0
                        resource_attributes_key_values[ind]->key.arg =
833
0
                            (void *)resource_attributes_struct[ind].name;
834
835
0
                        resource_attributes_key_values[ind]->has_value = true;
836
0
                        resource_attributes_key_values[ind]->value.which_value =
837
0
                            opentelemetry_proto_common_v1_AnyValue_string_value_tag;
838
0
                        resource_attributes_key_values[ind]
839
0
                            ->value.value.string_value.funcs.encode =
840
0
                            &encode_string;
841
0
                        resource_attributes_key_values[ind]
842
0
                            ->value.value.string_value.arg =
843
0
                            (void *)resource_attributes_struct[ind].value;
844
0
                }
845
0
                resource_attributes_repeated.key_values =
846
0
                    resource_attributes_key_values;
847
0
                resource_attributes_repeated.count = resource_attributes_count;
848
0
                resource_metrics.has_resource      = true;
849
0
                resource_metrics.resource.attributes.funcs.encode =
850
0
                    &encode_key_values;
851
0
                resource_metrics.resource.attributes.arg =
852
0
                    &resource_attributes_repeated;
853
0
        }
854
855
0
        opentelemetry_proto_metrics_v1_ScopeMetrics scope_metrics =
856
0
            opentelemetry_proto_metrics_v1_ScopeMetrics_init_zero;
857
858
0
        opentelemetry_proto_common_v1_InstrumentationScope
859
0
            instrumentation_scope =
860
0
                opentelemetry_proto_common_v1_InstrumentationScope_init_zero;
861
0
        instrumentation_scope.name.funcs.encode    = &encode_string;
862
0
        instrumentation_scope.name.arg             = (void *)rd_kafka_name(rk);
863
0
        instrumentation_scope.version.funcs.encode = &encode_string;
864
0
        instrumentation_scope.version.arg = (void *)rd_kafka_version_str();
865
866
0
        scope_metrics.has_scope = true;
867
0
        scope_metrics.scope     = instrumentation_scope;
868
869
0
        metrics = rd_malloc(sizeof(opentelemetry_proto_metrics_v1_Metric *) *
870
0
                            total_metrics_count);
871
0
        data_points =
872
0
            rd_malloc(sizeof(opentelemetry_proto_metrics_v1_NumberDataPoint *) *
873
0
                      total_metrics_count);
874
0
        datapoint_attributes_key_values =
875
0
            rd_malloc(sizeof(opentelemetry_proto_common_v1_KeyValue) *
876
0
                      total_metrics_count);
877
0
        metric_names = rd_malloc(sizeof(char *) * total_metrics_count);
878
0
        rd_kafka_dbg(rk, TELEMETRY, "PUSH",
879
0
                     "Total metrics to be encoded count: %" PRIusz,
880
0
                     total_metrics_count);
881
882
883
0
        for (i = 0; i < metrics_to_encode_count; i++) {
884
885
0
                rd_kafka_telemetry_metric_value_calculator_t
886
0
                    metric_value_calculator =
887
0
                        (rk->rk_type == RD_KAFKA_PRODUCER)
888
0
                            ? PRODUCER_METRIC_VALUE_CALCULATORS
889
0
                                  [metrics_to_encode[i]]
890
0
                            : CONSUMER_METRIC_VALUE_CALCULATORS
891
0
                                  [metrics_to_encode[i]];
892
0
                if (info[metrics_to_encode[i]].is_per_broker) {
893
0
                        rd_kafka_broker_t *rkb;
894
895
0
                        TAILQ_FOREACH(rkb, &rk->rk_brokers, rkb_link) {
896
0
                                metrics[metric_idx] = rd_calloc(
897
0
                                    1,
898
0
                                    sizeof(
899
0
                                        opentelemetry_proto_metrics_v1_Metric));
900
0
                                data_points[metric_idx] = rd_calloc(
901
0
                                    1,
902
0
                                    sizeof(
903
0
                                        opentelemetry_proto_metrics_v1_NumberDataPoint));
904
0
                                serialize_Metric(
905
0
                                    rk, rkb, &info[metrics_to_encode[i]],
906
0
                                    &metrics[metric_idx],
907
0
                                    &data_points[metric_idx],
908
0
                                    &datapoint_attributes_key_values
909
0
                                        [metric_idx],
910
0
                                    metric_value_calculator,
911
0
                                    &metric_names[metric_idx], true, now_ns);
912
0
                                metric_idx++;
913
0
                        }
914
0
                        continue;
915
0
                }
916
917
0
                metrics[metric_idx] =
918
0
                    rd_calloc(1, sizeof(opentelemetry_proto_metrics_v1_Metric));
919
0
                data_points[metric_idx] = rd_calloc(
920
0
                    1, sizeof(opentelemetry_proto_metrics_v1_NumberDataPoint));
921
922
0
                serialize_Metric(rk, NULL, &info[metrics_to_encode[i]],
923
0
                                 &metrics[metric_idx], &data_points[metric_idx],
924
0
                                 &datapoint_attributes_key_values[metric_idx],
925
0
                                 metric_value_calculator,
926
0
                                 &metric_names[metric_idx], false, now_ns);
927
0
                metric_idx++;
928
0
        }
929
930
        /* Send empty metrics blob if no metrics are matched */
931
0
        if (total_metrics_count > 0) {
932
0
                metrics_repeated.metrics = metrics;
933
0
                metrics_repeated.count   = total_metrics_count;
934
935
0
                scope_metrics.metrics.funcs.encode = &encode_metric;
936
0
                scope_metrics.metrics.arg          = &metrics_repeated;
937
938
939
0
                resource_metrics.scope_metrics.funcs.encode =
940
0
                    &encode_scope_metrics;
941
0
                resource_metrics.scope_metrics.arg = &scope_metrics;
942
943
0
                metrics_data.resource_metrics.funcs.encode =
944
0
                    &encode_resource_metrics;
945
0
                metrics_data.resource_metrics.arg = &resource_metrics;
946
0
        }
947
948
0
        status = pb_get_encoded_size(
949
0
            &message_size, opentelemetry_proto_metrics_v1_MetricsData_fields,
950
0
            &metrics_data);
951
0
        if (!status) {
952
0
                rd_kafka_dbg(rk, TELEMETRY, "PUSH",
953
0
                             "Failed to get encoded size");
954
0
                goto fail;
955
0
        }
956
957
0
        rbuf = rd_buf_new(1, message_size);
958
0
        rd_buf_write_ensure(rbuf, message_size, message_size);
959
0
        message_size = rd_buf_get_writable(rbuf, &buffer);
960
961
0
        stream = pb_ostream_from_buffer(buffer, message_size);
962
0
        status = pb_encode(&stream,
963
0
                           opentelemetry_proto_metrics_v1_MetricsData_fields,
964
0
                           &metrics_data);
965
966
0
        if (!status) {
967
0
                rd_kafka_dbg(rk, TELEMETRY, "PUSH", "Encoding failed: %s",
968
0
                             PB_GET_ERROR(&stream));
969
0
                rd_buf_destroy_free(rbuf);
970
0
                goto fail;
971
0
        }
972
0
        rd_kafka_dbg(rk, TELEMETRY, "PUSH",
973
0
                     "Push Telemetry metrics encoded, size: %" PRIusz,
974
0
                     stream.bytes_written);
975
0
        rd_buf_write(rbuf, NULL, stream.bytes_written);
976
977
0
        reset_historical_metrics(rk, now_ns);
978
979
0
        free_metrics(metrics, metric_names, data_points,
980
0
                     datapoint_attributes_key_values, total_metrics_count);
981
0
        free_resource_attributes(resource_attributes_key_values,
982
0
                                 resource_attributes_struct,
983
0
                                 resource_attributes_count);
984
0
        rd_kafka_rdunlock(rk);
985
986
0
        return rbuf;
987
988
0
fail:
989
0
        free_metrics(metrics, metric_names, data_points,
990
0
                     datapoint_attributes_key_values, total_metrics_count);
991
0
        free_resource_attributes(resource_attributes_key_values,
992
0
                                 resource_attributes_struct,
993
0
                                 resource_attributes_count);
994
0
        rd_kafka_rdunlock(rk);
995
996
        return NULL;
997
0
}