Coverage Report

Created: 2026-01-21 07:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/include/fluent-bit/flb_bucket_queue.h
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2019-2021 The Fluent Bit Authors
6
 *  Copyright (C) 2015-2018 Treasure Data Inc.
7
 *
8
 *  Licensed under the Apache License, Version 2.0 (the "License");
9
 *  you may not use this file except in compliance with the License.
10
 *  You may obtain a copy of the License at
11
 *
12
 *      http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 *  Unless required by applicable law or agreed to in writing, software
15
 *  distributed under the License is distributed on an "AS IS" BASIS,
16
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 *  See the License for the specific language governing permissions and
18
 *  limitations under the License.
19
 */
20
21
/*
22
 * Note: This implementation can handle priority item removal via
23
 * flb_bucket_queue_delete_min(bucket_queue) and direct item removal
24
 * via mk_list_del(&item)
25
 */
26
27
#ifndef     FLB_BUCKET_QUEUE_H_
28
#define     FLB_BUCKET_QUEUE_H_
29
30
#include <stddef.h>
31
#include <fluent-bit/flb_mem.h>
32
#include <monkey/mk_core/mk_list.h>
33
34
35
struct flb_bucket_queue
36
{
37
    struct mk_list *buckets;
38
    size_t n_buckets;
39
    struct mk_list *top;
40
    size_t n_items;
41
};
42
43
static inline struct flb_bucket_queue *flb_bucket_queue_create(size_t priorities)
44
92
{
45
92
    size_t i;
46
92
    struct flb_bucket_queue *bucket_queue;
47
48
92
    bucket_queue = (struct flb_bucket_queue *)
49
92
                   flb_malloc(sizeof(struct flb_bucket_queue));
50
92
    if (!bucket_queue) {
51
0
        return NULL;
52
0
    }
53
92
    bucket_queue->buckets = (struct mk_list *)
54
92
                            flb_malloc(sizeof(struct mk_list) *priorities);
55
92
    if (!bucket_queue->buckets) {
56
0
        flb_free(bucket_queue);
57
0
        return NULL;
58
0
    }
59
828
    for (i = 0; i < priorities; ++i) {
60
736
        mk_list_init(&bucket_queue->buckets[i]);
61
736
    }
62
92
    bucket_queue->n_buckets = priorities;
63
92
    bucket_queue->top = (bucket_queue->buckets + bucket_queue->n_buckets); /* one past the last element */
64
92
    bucket_queue->n_items = 0;
65
92
    return bucket_queue;
66
92
}
Unexecuted instantiation: flb_config.c:flb_bucket_queue_create
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_create
flb_output_thread.c:flb_bucket_queue_create
Line
Count
Source
44
45
{
45
45
    size_t i;
46
45
    struct flb_bucket_queue *bucket_queue;
47
48
45
    bucket_queue = (struct flb_bucket_queue *)
49
45
                   flb_malloc(sizeof(struct flb_bucket_queue));
50
45
    if (!bucket_queue) {
51
0
        return NULL;
52
0
    }
53
45
    bucket_queue->buckets = (struct mk_list *)
54
45
                            flb_malloc(sizeof(struct mk_list) *priorities);
55
45
    if (!bucket_queue->buckets) {
56
0
        flb_free(bucket_queue);
57
0
        return NULL;
58
0
    }
59
405
    for (i = 0; i < priorities; ++i) {
60
360
        mk_list_init(&bucket_queue->buckets[i]);
61
360
    }
62
45
    bucket_queue->n_buckets = priorities;
63
45
    bucket_queue->top = (bucket_queue->buckets + bucket_queue->n_buckets); /* one past the last element */
64
45
    bucket_queue->n_items = 0;
65
45
    return bucket_queue;
66
45
}
flb_engine.c:flb_bucket_queue_create
Line
Count
Source
44
47
{
45
47
    size_t i;
46
47
    struct flb_bucket_queue *bucket_queue;
47
48
47
    bucket_queue = (struct flb_bucket_queue *)
49
47
                   flb_malloc(sizeof(struct flb_bucket_queue));
50
47
    if (!bucket_queue) {
51
0
        return NULL;
52
0
    }
53
47
    bucket_queue->buckets = (struct mk_list *)
54
47
                            flb_malloc(sizeof(struct mk_list) *priorities);
55
47
    if (!bucket_queue->buckets) {
56
0
        flb_free(bucket_queue);
57
0
        return NULL;
58
0
    }
59
423
    for (i = 0; i < priorities; ++i) {
60
376
        mk_list_init(&bucket_queue->buckets[i]);
61
376
    }
62
47
    bucket_queue->n_buckets = priorities;
63
47
    bucket_queue->top = (bucket_queue->buckets + bucket_queue->n_buckets); /* one past the last element */
64
47
    bucket_queue->n_items = 0;
65
47
    return bucket_queue;
66
47
}
67
68
static inline int flb_bucket_queue_is_empty(struct flb_bucket_queue *bucket_queue)
69
28.5M
{
70
28.5M
    return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets);
71
28.5M
}
flb_config.c:flb_bucket_queue_is_empty
Line
Count
Source
69
90
{
70
90
    return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets);
71
90
}
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_is_empty
flb_output_thread.c:flb_bucket_queue_is_empty
Line
Count
Source
69
2.45k
{
70
2.45k
    return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets);
71
2.45k
}
flb_engine.c:flb_bucket_queue_is_empty
Line
Count
Source
69
28.5M
{
70
28.5M
    return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets);
71
28.5M
}
72
73
14.2M
static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) {
74
19.6M
    while (!flb_bucket_queue_is_empty(bucket_queue)
75
13.3M
          && (mk_list_is_empty(bucket_queue->top) == 0)) {
76
5.33M
        ++bucket_queue->top;
77
5.33M
    }
78
14.2M
}
flb_config.c:flb_bucket_queue_seek
Line
Count
Source
73
45
static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) {
74
45
    while (!flb_bucket_queue_is_empty(bucket_queue)
75
0
          && (mk_list_is_empty(bucket_queue->top) == 0)) {
76
0
        ++bucket_queue->top;
77
0
    }
78
45
}
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_seek
flb_output_thread.c:flb_bucket_queue_seek
Line
Count
Source
73
926
static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) {
74
1.82k
    while (!flb_bucket_queue_is_empty(bucket_queue)
75
1.34k
          && (mk_list_is_empty(bucket_queue->top) == 0)) {
76
898
        ++bucket_queue->top;
77
898
    }
78
926
}
flb_engine.c:flb_bucket_queue_seek
Line
Count
Source
73
14.2M
static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) {
74
19.6M
    while (!flb_bucket_queue_is_empty(bucket_queue)
75
13.3M
          && (mk_list_is_empty(bucket_queue->top) == 0)) {
76
5.33M
        ++bucket_queue->top;
77
5.33M
    }
78
14.2M
}
79
80
static inline int flb_bucket_queue_add(struct flb_bucket_queue *bucket_queue,
81
                                      struct mk_list *item, size_t priority)
82
2.66M
{
83
2.66M
    if (priority >= bucket_queue->n_buckets) {
84
        /* flb_error("Error: attempting to add item of priority %zu to bucket_queue out "
85
               "of priority range", priority); */
86
0
        return -1;
87
0
    }
88
2.66M
    flb_bucket_queue_seek(bucket_queue);
89
2.66M
    mk_list_add(item, &bucket_queue->buckets[priority]);
90
2.66M
    if (&bucket_queue->buckets[priority] < bucket_queue->top) {
91
2.66M
        bucket_queue->top = &bucket_queue->buckets[priority];
92
2.66M
    }
93
2.66M
    ++bucket_queue->n_items;
94
2.66M
    return 0;
95
2.66M
}
Unexecuted instantiation: flb_config.c:flb_bucket_queue_add
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_add
flb_output_thread.c:flb_bucket_queue_add
Line
Count
Source
82
147
{
83
147
    if (priority >= bucket_queue->n_buckets) {
84
        /* flb_error("Error: attempting to add item of priority %zu to bucket_queue out "
85
               "of priority range", priority); */
86
0
        return -1;
87
0
    }
88
147
    flb_bucket_queue_seek(bucket_queue);
89
147
    mk_list_add(item, &bucket_queue->buckets[priority]);
90
147
    if (&bucket_queue->buckets[priority] < bucket_queue->top) {
91
146
        bucket_queue->top = &bucket_queue->buckets[priority];
92
146
    }
93
147
    ++bucket_queue->n_items;
94
147
    return 0;
95
147
}
flb_engine.c:flb_bucket_queue_add
Line
Count
Source
82
2.66M
{
83
2.66M
    if (priority >= bucket_queue->n_buckets) {
84
        /* flb_error("Error: attempting to add item of priority %zu to bucket_queue out "
85
               "of priority range", priority); */
86
0
        return -1;
87
0
    }
88
2.66M
    flb_bucket_queue_seek(bucket_queue);
89
2.66M
    mk_list_add(item, &bucket_queue->buckets[priority]);
90
2.66M
    if (&bucket_queue->buckets[priority] < bucket_queue->top) {
91
2.66M
        bucket_queue->top = &bucket_queue->buckets[priority];
92
2.66M
    }
93
2.66M
    ++bucket_queue->n_items;
94
2.66M
    return 0;
95
2.66M
}
96
97
/* fifo based on priority */
98
static inline struct mk_list *flb_bucket_queue_find_min(struct flb_bucket_queue *bucket_queue)
99
6.29M
{
100
6.29M
    flb_bucket_queue_seek(bucket_queue);
101
6.29M
    if (flb_bucket_queue_is_empty(bucket_queue)) {
102
958k
        return NULL;
103
958k
    }
104
5.33M
    return bucket_queue->top->next;
105
6.29M
}
Unexecuted instantiation: flb_config.c:flb_bucket_queue_find_min
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_find_min
flb_output_thread.c:flb_bucket_queue_find_min
Line
Count
Source
99
440
{
100
440
    flb_bucket_queue_seek(bucket_queue);
101
440
    if (flb_bucket_queue_is_empty(bucket_queue)) {
102
146
        return NULL;
103
146
    }
104
294
    return bucket_queue->top->next;
105
440
}
flb_engine.c:flb_bucket_queue_find_min
Line
Count
Source
99
6.29M
{
100
6.29M
    flb_bucket_queue_seek(bucket_queue);
101
6.29M
    if (flb_bucket_queue_is_empty(bucket_queue)) {
102
958k
        return NULL;
103
958k
    }
104
5.33M
    return bucket_queue->top->next;
105
6.29M
}
106
107
static inline void flb_bucket_queue_delete_min(struct flb_bucket_queue *bucket_queue)
108
2.66M
{
109
2.66M
    flb_bucket_queue_seek(bucket_queue);
110
2.66M
    if (flb_bucket_queue_is_empty(bucket_queue)) {
111
0
        return;
112
0
    }
113
2.66M
    mk_list_del(bucket_queue->top->next);
114
2.66M
    flb_bucket_queue_seek(bucket_queue); /* this line can be removed. Debugging is harder */
115
2.66M
    --bucket_queue->n_items;
116
2.66M
}
Unexecuted instantiation: flb_config.c:flb_bucket_queue_delete_min
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_delete_min
flb_output_thread.c:flb_bucket_queue_delete_min
Line
Count
Source
108
147
{
109
147
    flb_bucket_queue_seek(bucket_queue);
110
147
    if (flb_bucket_queue_is_empty(bucket_queue)) {
111
0
        return;
112
0
    }
113
147
    mk_list_del(bucket_queue->top->next);
114
147
    flb_bucket_queue_seek(bucket_queue); /* this line can be removed. Debugging is harder */
115
147
    --bucket_queue->n_items;
116
147
}
flb_engine.c:flb_bucket_queue_delete_min
Line
Count
Source
108
2.66M
{
109
2.66M
    flb_bucket_queue_seek(bucket_queue);
110
2.66M
    if (flb_bucket_queue_is_empty(bucket_queue)) {
111
0
        return;
112
0
    }
113
2.66M
    mk_list_del(bucket_queue->top->next);
114
2.66M
    flb_bucket_queue_seek(bucket_queue); /* this line can be removed. Debugging is harder */
115
2.66M
    --bucket_queue->n_items;
116
2.66M
}
117
118
static inline struct mk_list *flb_bucket_queue_pop_min(struct flb_bucket_queue *bucket_queue)
119
2.66M
{
120
2.66M
    struct mk_list *item;
121
2.66M
    item = flb_bucket_queue_find_min(bucket_queue);
122
2.66M
    flb_bucket_queue_delete_min(bucket_queue);
123
2.66M
    return item;
124
2.66M
}
Unexecuted instantiation: flb_config.c:flb_bucket_queue_pop_min
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_pop_min
flb_output_thread.c:flb_bucket_queue_pop_min
Line
Count
Source
119
147
{
120
147
    struct mk_list *item;
121
147
    item = flb_bucket_queue_find_min(bucket_queue);
122
147
    flb_bucket_queue_delete_min(bucket_queue);
123
147
    return item;
124
147
}
flb_engine.c:flb_bucket_queue_pop_min
Line
Count
Source
119
2.66M
{
120
2.66M
    struct mk_list *item;
121
2.66M
    item = flb_bucket_queue_find_min(bucket_queue);
122
2.66M
    flb_bucket_queue_delete_min(bucket_queue);
123
2.66M
    return item;
124
2.66M
}
125
126
static inline int flb_bucket_queue_destroy(
127
                                     struct flb_bucket_queue *bucket_queue)
128
90
{
129
90
    flb_bucket_queue_seek(bucket_queue);
130
90
    if (!flb_bucket_queue_is_empty(bucket_queue)) {
131
        /* flb_error("Error: attempting to destroy non empty bucket_queue. Remove all "
132
                  "items first."); */
133
0
        return -1;
134
0
    }
135
90
    flb_free(bucket_queue->buckets);
136
90
    flb_free(bucket_queue);
137
90
    return 0;
138
90
}
flb_config.c:flb_bucket_queue_destroy
Line
Count
Source
128
45
{
129
45
    flb_bucket_queue_seek(bucket_queue);
130
45
    if (!flb_bucket_queue_is_empty(bucket_queue)) {
131
        /* flb_error("Error: attempting to destroy non empty bucket_queue. Remove all "
132
                  "items first."); */
133
0
        return -1;
134
0
    }
135
45
    flb_free(bucket_queue->buckets);
136
45
    flb_free(bucket_queue);
137
45
    return 0;
138
45
}
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_destroy
flb_output_thread.c:flb_bucket_queue_destroy
Line
Count
Source
128
45
{
129
45
    flb_bucket_queue_seek(bucket_queue);
130
45
    if (!flb_bucket_queue_is_empty(bucket_queue)) {
131
        /* flb_error("Error: attempting to destroy non empty bucket_queue. Remove all "
132
                  "items first."); */
133
0
        return -1;
134
0
    }
135
45
    flb_free(bucket_queue->buckets);
136
45
    flb_free(bucket_queue);
137
45
    return 0;
138
45
}
Unexecuted instantiation: flb_engine.c:flb_bucket_queue_destroy
139
140
#endif /* !FLB_BUCKET_QUEUE_H_ */