/src/fluent-bit/include/fluent-bit/flb_bucket_queue.h
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2019-2021 The Fluent Bit Authors |
6 | | * Copyright (C) 2015-2018 Treasure Data Inc. |
7 | | * |
8 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
9 | | * you may not use this file except in compliance with the License. |
10 | | * You may obtain a copy of the License at |
11 | | * |
12 | | * http://www.apache.org/licenses/LICENSE-2.0 |
13 | | * |
14 | | * Unless required by applicable law or agreed to in writing, software |
15 | | * distributed under the License is distributed on an "AS IS" BASIS, |
16 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
17 | | * See the License for the specific language governing permissions and |
18 | | * limitations under the License. |
19 | | */ |
20 | | |
21 | | /* |
22 | | * Note: This implementation can handle priority item removal via |
23 | | * flb_bucket_queue_delete_min(bucket_queue) and direct item removal |
24 | | * via mk_list_del(&item) |
25 | | */ |
26 | | |
27 | | #ifndef FLB_BUCKET_QUEUE_H_ |
28 | | #define FLB_BUCKET_QUEUE_H_ |
29 | | |
30 | | #include <stddef.h> |
31 | | #include <fluent-bit/flb_mem.h> |
32 | | #include <monkey/mk_core/mk_list.h> |
33 | | |
34 | | |
35 | | struct flb_bucket_queue |
36 | | { |
37 | | struct mk_list *buckets; |
38 | | size_t n_buckets; |
39 | | struct mk_list *top; |
40 | | size_t n_items; |
41 | | }; |
42 | | |
43 | | static inline struct flb_bucket_queue *flb_bucket_queue_create(size_t priorities) |
44 | 92 | { |
45 | 92 | size_t i; |
46 | 92 | struct flb_bucket_queue *bucket_queue; |
47 | | |
48 | 92 | bucket_queue = (struct flb_bucket_queue *) |
49 | 92 | flb_malloc(sizeof(struct flb_bucket_queue)); |
50 | 92 | if (!bucket_queue) { |
51 | 0 | return NULL; |
52 | 0 | } |
53 | 92 | bucket_queue->buckets = (struct mk_list *) |
54 | 92 | flb_malloc(sizeof(struct mk_list) *priorities); |
55 | 92 | if (!bucket_queue->buckets) { |
56 | 0 | flb_free(bucket_queue); |
57 | 0 | return NULL; |
58 | 0 | } |
59 | 828 | for (i = 0; i < priorities; ++i) { |
60 | 736 | mk_list_init(&bucket_queue->buckets[i]); |
61 | 736 | } |
62 | 92 | bucket_queue->n_buckets = priorities; |
63 | 92 | bucket_queue->top = (bucket_queue->buckets + bucket_queue->n_buckets); /* one past the last element */ |
64 | 92 | bucket_queue->n_items = 0; |
65 | 92 | return bucket_queue; |
66 | 92 | } Unexecuted instantiation: flb_config.c:flb_bucket_queue_create Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_create flb_output_thread.c:flb_bucket_queue_create Line | Count | Source | 44 | 45 | { | 45 | 45 | size_t i; | 46 | 45 | struct flb_bucket_queue *bucket_queue; | 47 | | | 48 | 45 | bucket_queue = (struct flb_bucket_queue *) | 49 | 45 | flb_malloc(sizeof(struct flb_bucket_queue)); | 50 | 45 | if (!bucket_queue) { | 51 | 0 | return NULL; | 52 | 0 | } | 53 | 45 | bucket_queue->buckets = (struct mk_list *) | 54 | 45 | flb_malloc(sizeof(struct mk_list) *priorities); | 55 | 45 | if (!bucket_queue->buckets) { | 56 | 0 | flb_free(bucket_queue); | 57 | 0 | return NULL; | 58 | 0 | } | 59 | 405 | for (i = 0; i < priorities; ++i) { | 60 | 360 | mk_list_init(&bucket_queue->buckets[i]); | 61 | 360 | } | 62 | 45 | bucket_queue->n_buckets = priorities; | 63 | 45 | bucket_queue->top = (bucket_queue->buckets + bucket_queue->n_buckets); /* one past the last element */ | 64 | 45 | bucket_queue->n_items = 0; | 65 | 45 | return bucket_queue; | 66 | 45 | } |
flb_engine.c:flb_bucket_queue_create Line | Count | Source | 44 | 47 | { | 45 | 47 | size_t i; | 46 | 47 | struct flb_bucket_queue *bucket_queue; | 47 | | | 48 | 47 | bucket_queue = (struct flb_bucket_queue *) | 49 | 47 | flb_malloc(sizeof(struct flb_bucket_queue)); | 50 | 47 | if (!bucket_queue) { | 51 | 0 | return NULL; | 52 | 0 | } | 53 | 47 | bucket_queue->buckets = (struct mk_list *) | 54 | 47 | flb_malloc(sizeof(struct mk_list) *priorities); | 55 | 47 | if (!bucket_queue->buckets) { | 56 | 0 | flb_free(bucket_queue); | 57 | 0 | return NULL; | 58 | 0 | } | 59 | 423 | for (i = 0; i < priorities; ++i) { | 60 | 376 | mk_list_init(&bucket_queue->buckets[i]); | 61 | 376 | } | 62 | 47 | bucket_queue->n_buckets = priorities; | 63 | 47 | bucket_queue->top = (bucket_queue->buckets + bucket_queue->n_buckets); /* one past the last element */ | 64 | 47 | bucket_queue->n_items = 0; | 65 | 47 | return bucket_queue; | 66 | 47 | } |
|
67 | | |
68 | | static inline int flb_bucket_queue_is_empty(struct flb_bucket_queue *bucket_queue) |
69 | 28.5M | { |
70 | 28.5M | return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets); |
71 | 28.5M | } flb_config.c:flb_bucket_queue_is_empty Line | Count | Source | 69 | 90 | { | 70 | 90 | return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets); | 71 | 90 | } |
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_is_empty flb_output_thread.c:flb_bucket_queue_is_empty Line | Count | Source | 69 | 2.45k | { | 70 | 2.45k | return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets); | 71 | 2.45k | } |
flb_engine.c:flb_bucket_queue_is_empty Line | Count | Source | 69 | 28.5M | { | 70 | 28.5M | return bucket_queue->top == (bucket_queue->buckets + bucket_queue->n_buckets); | 71 | 28.5M | } |
|
72 | | |
73 | 14.2M | static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) { |
74 | 19.6M | while (!flb_bucket_queue_is_empty(bucket_queue) |
75 | 13.3M | && (mk_list_is_empty(bucket_queue->top) == 0)) { |
76 | 5.33M | ++bucket_queue->top; |
77 | 5.33M | } |
78 | 14.2M | } flb_config.c:flb_bucket_queue_seek Line | Count | Source | 73 | 45 | static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) { | 74 | 45 | while (!flb_bucket_queue_is_empty(bucket_queue) | 75 | 0 | && (mk_list_is_empty(bucket_queue->top) == 0)) { | 76 | 0 | ++bucket_queue->top; | 77 | 0 | } | 78 | 45 | } |
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_seek flb_output_thread.c:flb_bucket_queue_seek Line | Count | Source | 73 | 926 | static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) { | 74 | 1.82k | while (!flb_bucket_queue_is_empty(bucket_queue) | 75 | 1.34k | && (mk_list_is_empty(bucket_queue->top) == 0)) { | 76 | 898 | ++bucket_queue->top; | 77 | 898 | } | 78 | 926 | } |
flb_engine.c:flb_bucket_queue_seek Line | Count | Source | 73 | 14.2M | static inline void flb_bucket_queue_seek(struct flb_bucket_queue *bucket_queue) { | 74 | 19.6M | while (!flb_bucket_queue_is_empty(bucket_queue) | 75 | 13.3M | && (mk_list_is_empty(bucket_queue->top) == 0)) { | 76 | 5.33M | ++bucket_queue->top; | 77 | 5.33M | } | 78 | 14.2M | } |
|
79 | | |
80 | | static inline int flb_bucket_queue_add(struct flb_bucket_queue *bucket_queue, |
81 | | struct mk_list *item, size_t priority) |
82 | 2.66M | { |
83 | 2.66M | if (priority >= bucket_queue->n_buckets) { |
84 | | /* flb_error("Error: attempting to add item of priority %zu to bucket_queue out " |
85 | | "of priority range", priority); */ |
86 | 0 | return -1; |
87 | 0 | } |
88 | 2.66M | flb_bucket_queue_seek(bucket_queue); |
89 | 2.66M | mk_list_add(item, &bucket_queue->buckets[priority]); |
90 | 2.66M | if (&bucket_queue->buckets[priority] < bucket_queue->top) { |
91 | 2.66M | bucket_queue->top = &bucket_queue->buckets[priority]; |
92 | 2.66M | } |
93 | 2.66M | ++bucket_queue->n_items; |
94 | 2.66M | return 0; |
95 | 2.66M | } Unexecuted instantiation: flb_config.c:flb_bucket_queue_add Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_add flb_output_thread.c:flb_bucket_queue_add Line | Count | Source | 82 | 147 | { | 83 | 147 | if (priority >= bucket_queue->n_buckets) { | 84 | | /* flb_error("Error: attempting to add item of priority %zu to bucket_queue out " | 85 | | "of priority range", priority); */ | 86 | 0 | return -1; | 87 | 0 | } | 88 | 147 | flb_bucket_queue_seek(bucket_queue); | 89 | 147 | mk_list_add(item, &bucket_queue->buckets[priority]); | 90 | 147 | if (&bucket_queue->buckets[priority] < bucket_queue->top) { | 91 | 146 | bucket_queue->top = &bucket_queue->buckets[priority]; | 92 | 146 | } | 93 | 147 | ++bucket_queue->n_items; | 94 | 147 | return 0; | 95 | 147 | } |
flb_engine.c:flb_bucket_queue_add Line | Count | Source | 82 | 2.66M | { | 83 | 2.66M | if (priority >= bucket_queue->n_buckets) { | 84 | | /* flb_error("Error: attempting to add item of priority %zu to bucket_queue out " | 85 | | "of priority range", priority); */ | 86 | 0 | return -1; | 87 | 0 | } | 88 | 2.66M | flb_bucket_queue_seek(bucket_queue); | 89 | 2.66M | mk_list_add(item, &bucket_queue->buckets[priority]); | 90 | 2.66M | if (&bucket_queue->buckets[priority] < bucket_queue->top) { | 91 | 2.66M | bucket_queue->top = &bucket_queue->buckets[priority]; | 92 | 2.66M | } | 93 | 2.66M | ++bucket_queue->n_items; | 94 | 2.66M | return 0; | 95 | 2.66M | } |
|
96 | | |
97 | | /* fifo based on priority */ |
98 | | static inline struct mk_list *flb_bucket_queue_find_min(struct flb_bucket_queue *bucket_queue) |
99 | 6.29M | { |
100 | 6.29M | flb_bucket_queue_seek(bucket_queue); |
101 | 6.29M | if (flb_bucket_queue_is_empty(bucket_queue)) { |
102 | 958k | return NULL; |
103 | 958k | } |
104 | 5.33M | return bucket_queue->top->next; |
105 | 6.29M | } Unexecuted instantiation: flb_config.c:flb_bucket_queue_find_min Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_find_min flb_output_thread.c:flb_bucket_queue_find_min Line | Count | Source | 99 | 440 | { | 100 | 440 | flb_bucket_queue_seek(bucket_queue); | 101 | 440 | if (flb_bucket_queue_is_empty(bucket_queue)) { | 102 | 146 | return NULL; | 103 | 146 | } | 104 | 294 | return bucket_queue->top->next; | 105 | 440 | } |
flb_engine.c:flb_bucket_queue_find_min Line | Count | Source | 99 | 6.29M | { | 100 | 6.29M | flb_bucket_queue_seek(bucket_queue); | 101 | 6.29M | if (flb_bucket_queue_is_empty(bucket_queue)) { | 102 | 958k | return NULL; | 103 | 958k | } | 104 | 5.33M | return bucket_queue->top->next; | 105 | 6.29M | } |
|
106 | | |
107 | | static inline void flb_bucket_queue_delete_min(struct flb_bucket_queue *bucket_queue) |
108 | 2.66M | { |
109 | 2.66M | flb_bucket_queue_seek(bucket_queue); |
110 | 2.66M | if (flb_bucket_queue_is_empty(bucket_queue)) { |
111 | 0 | return; |
112 | 0 | } |
113 | 2.66M | mk_list_del(bucket_queue->top->next); |
114 | 2.66M | flb_bucket_queue_seek(bucket_queue); /* this line can be removed. Debugging is harder */ |
115 | 2.66M | --bucket_queue->n_items; |
116 | 2.66M | } Unexecuted instantiation: flb_config.c:flb_bucket_queue_delete_min Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_delete_min flb_output_thread.c:flb_bucket_queue_delete_min Line | Count | Source | 108 | 147 | { | 109 | 147 | flb_bucket_queue_seek(bucket_queue); | 110 | 147 | if (flb_bucket_queue_is_empty(bucket_queue)) { | 111 | 0 | return; | 112 | 0 | } | 113 | 147 | mk_list_del(bucket_queue->top->next); | 114 | 147 | flb_bucket_queue_seek(bucket_queue); /* this line can be removed. Debugging is harder */ | 115 | 147 | --bucket_queue->n_items; | 116 | 147 | } |
flb_engine.c:flb_bucket_queue_delete_min Line | Count | Source | 108 | 2.66M | { | 109 | 2.66M | flb_bucket_queue_seek(bucket_queue); | 110 | 2.66M | if (flb_bucket_queue_is_empty(bucket_queue)) { | 111 | 0 | return; | 112 | 0 | } | 113 | 2.66M | mk_list_del(bucket_queue->top->next); | 114 | 2.66M | flb_bucket_queue_seek(bucket_queue); /* this line can be removed. Debugging is harder */ | 115 | 2.66M | --bucket_queue->n_items; | 116 | 2.66M | } |
|
117 | | |
118 | | static inline struct mk_list *flb_bucket_queue_pop_min(struct flb_bucket_queue *bucket_queue) |
119 | 2.66M | { |
120 | 2.66M | struct mk_list *item; |
121 | 2.66M | item = flb_bucket_queue_find_min(bucket_queue); |
122 | 2.66M | flb_bucket_queue_delete_min(bucket_queue); |
123 | 2.66M | return item; |
124 | 2.66M | } Unexecuted instantiation: flb_config.c:flb_bucket_queue_pop_min Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_pop_min flb_output_thread.c:flb_bucket_queue_pop_min Line | Count | Source | 119 | 147 | { | 120 | 147 | struct mk_list *item; | 121 | 147 | item = flb_bucket_queue_find_min(bucket_queue); | 122 | 147 | flb_bucket_queue_delete_min(bucket_queue); | 123 | 147 | return item; | 124 | 147 | } |
flb_engine.c:flb_bucket_queue_pop_min Line | Count | Source | 119 | 2.66M | { | 120 | 2.66M | struct mk_list *item; | 121 | 2.66M | item = flb_bucket_queue_find_min(bucket_queue); | 122 | 2.66M | flb_bucket_queue_delete_min(bucket_queue); | 123 | 2.66M | return item; | 124 | 2.66M | } |
|
125 | | |
126 | | static inline int flb_bucket_queue_destroy( |
127 | | struct flb_bucket_queue *bucket_queue) |
128 | 90 | { |
129 | 90 | flb_bucket_queue_seek(bucket_queue); |
130 | 90 | if (!flb_bucket_queue_is_empty(bucket_queue)) { |
131 | | /* flb_error("Error: attempting to destroy non empty bucket_queue. Remove all " |
132 | | "items first."); */ |
133 | 0 | return -1; |
134 | 0 | } |
135 | 90 | flb_free(bucket_queue->buckets); |
136 | 90 | flb_free(bucket_queue); |
137 | 90 | return 0; |
138 | 90 | } flb_config.c:flb_bucket_queue_destroy Line | Count | Source | 128 | 45 | { | 129 | 45 | flb_bucket_queue_seek(bucket_queue); | 130 | 45 | if (!flb_bucket_queue_is_empty(bucket_queue)) { | 131 | | /* flb_error("Error: attempting to destroy non empty bucket_queue. Remove all " | 132 | | "items first."); */ | 133 | 0 | return -1; | 134 | 0 | } | 135 | 45 | flb_free(bucket_queue->buckets); | 136 | 45 | flb_free(bucket_queue); | 137 | 45 | return 0; | 138 | 45 | } |
Unexecuted instantiation: flb_input_thread.c:flb_bucket_queue_destroy flb_output_thread.c:flb_bucket_queue_destroy Line | Count | Source | 128 | 45 | { | 129 | 45 | flb_bucket_queue_seek(bucket_queue); | 130 | 45 | if (!flb_bucket_queue_is_empty(bucket_queue)) { | 131 | | /* flb_error("Error: attempting to destroy non empty bucket_queue. Remove all " | 132 | | "items first."); */ | 133 | 0 | return -1; | 134 | 0 | } | 135 | 45 | flb_free(bucket_queue->buckets); | 136 | 45 | flb_free(bucket_queue); | 137 | 45 | return 0; | 138 | 45 | } |
Unexecuted instantiation: flb_engine.c:flb_bucket_queue_destroy |
139 | | |
140 | | #endif /* !FLB_BUCKET_QUEUE_H_ */ |