/src/fluent-bit/src/flb_thread_pool.c
Line | Count | Source |
1 | | /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | |
3 | | /* Fluent Bit |
4 | | * ========== |
5 | | * Copyright (C) 2015-2026 The Fluent Bit Authors |
6 | | * |
7 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
8 | | * you may not use this file except in compliance with the License. |
9 | | * You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, software |
14 | | * distributed under the License is distributed on an "AS IS" BASIS, |
15 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | | * See the License for the specific language governing permissions and |
17 | | * limitations under the License. |
18 | | */ |
19 | | |
20 | | #include <fluent-bit/flb_info.h> |
21 | | #include <fluent-bit/flb_mem.h> |
22 | | #include <fluent-bit/flb_log.h> |
23 | | #include <fluent-bit/flb_worker.h> |
24 | | #include <fluent-bit/flb_thread_pool.h> |
25 | | |
26 | | /* Return the next thread id. We use the list size to set an id */ |
27 | | static int flb_tp_thread_get_id(struct flb_tp *tp) |
28 | 327 | { |
29 | 327 | return mk_list_size(&tp->list_threads); |
30 | 327 | } |
31 | | |
32 | | /* Create a thread manager context */ |
33 | | struct flb_tp *flb_tp_create(struct flb_config *config) |
34 | 212 | { |
35 | 212 | struct flb_tp *tp; |
36 | | |
37 | 212 | tp = flb_calloc(1, sizeof(struct flb_tp)); |
38 | 212 | if (!tp) { |
39 | 0 | flb_errno(); |
40 | 0 | return NULL; |
41 | 0 | } |
42 | 212 | tp->config = config; |
43 | 212 | mk_list_init(&tp->list_threads); |
44 | | |
45 | 212 | return tp; |
46 | 212 | } |
47 | | |
48 | | void flb_tp_destroy(struct flb_tp *tp) |
49 | 212 | { |
50 | 212 | struct mk_list *tmp; |
51 | 212 | struct mk_list *head; |
52 | 212 | struct flb_tp_thread *th; |
53 | | |
54 | 327 | mk_list_foreach_safe(head, tmp, &tp->list_threads) { |
55 | 327 | th = mk_list_entry(head, struct flb_tp_thread, _head); |
56 | 327 | mk_list_del(&th->_head); |
57 | 327 | flb_free(th); |
58 | 327 | } |
59 | | |
60 | 212 | flb_free(tp); |
61 | 212 | } |
62 | | |
63 | | struct flb_tp_thread *flb_tp_thread_create(struct flb_tp *tp, |
64 | | void (*func)(void *), void *arg, |
65 | | struct flb_config *config) |
66 | | |
67 | 327 | { |
68 | 327 | struct flb_tp_thread *th; |
69 | | |
70 | | /* Create thread context */ |
71 | 327 | th = flb_calloc(1, sizeof(struct flb_tp_thread)); |
72 | 327 | if (!th) { |
73 | 0 | flb_errno(); |
74 | 0 | return NULL; |
75 | 0 | } |
76 | 327 | th->config = config; |
77 | | |
78 | | /* |
79 | | * To spawn a thread, we use the 'worker' interface. Since the worker will |
80 | | * start the thread as soon as is invoked, we keep a reference to the worker |
81 | | * parameters in our context and we only use them when the thread is really |
82 | | * started through the call flb_tp_thread_start(). |
83 | | */ |
84 | 327 | th->params.func = func; |
85 | 327 | th->params.data = arg; |
86 | | |
87 | | /* Status */ |
88 | 327 | th->status = FLB_THREAD_POOL_NONE; |
89 | | |
90 | | /* Set the thread id */ |
91 | 327 | th->id = flb_tp_thread_get_id(tp); |
92 | | |
93 | | /* Link this thread context to the parent context list */ |
94 | 327 | mk_list_add(&th->_head, &tp->list_threads); |
95 | | |
96 | 327 | return th; |
97 | 327 | } |
98 | | |
99 | | |
100 | | /* Get a candidate thread using round-robin */ |
101 | | struct flb_tp_thread *flb_tp_thread_get_rr(struct flb_tp *tp) |
102 | 21 | { |
103 | 21 | struct flb_tp_thread *th; |
104 | | |
105 | 21 | if (!tp->thread_cur) { |
106 | 21 | th = mk_list_entry_first(&tp->list_threads, |
107 | 21 | struct flb_tp_thread, _head); |
108 | 21 | } |
109 | 0 | else { |
110 | 0 | th = mk_list_entry_next(tp->thread_cur, |
111 | 0 | struct flb_tp_thread, _head, |
112 | 0 | &tp->list_threads); |
113 | 0 | } |
114 | 21 | tp->thread_cur = &th->_head; |
115 | | |
116 | 21 | return th; |
117 | 21 | } |
118 | | |
119 | | int flb_tp_thread_start(struct flb_tp *tp, struct flb_tp_thread *th) |
120 | 327 | { |
121 | 327 | int ret; |
122 | | |
123 | 327 | ret = flb_worker_create(th->params.func, th->params.data, &th->tid, |
124 | 327 | th->config); |
125 | 327 | if (ret == -1) { |
126 | 0 | th->status = FLB_THREAD_POOL_ERROR; |
127 | 0 | return -1; |
128 | 0 | } |
129 | | |
130 | | /* |
131 | | * Retrieve the Worker context. The worker API don't return the |
132 | | * id or the context, so we use the created pthread_t (task id) |
133 | | * to obtain the reference. |
134 | | */ |
135 | 327 | th->worker = flb_worker_lookup(th->tid, tp->config); |
136 | 327 | th->status = FLB_THREAD_POOL_RUNNING; |
137 | | |
138 | 327 | return 0; |
139 | 327 | } |
140 | | |
141 | | int flb_tp_thread_start_id(struct flb_tp *tp, int id) |
142 | 0 | { |
143 | 0 | int i = 0; |
144 | 0 | struct mk_list *head; |
145 | 0 | struct flb_tp_thread *th = NULL; |
146 | |
|
147 | 0 | mk_list_foreach(head, &tp->list_threads) { |
148 | 0 | if (i == id) { |
149 | 0 | th = mk_list_entry(head, struct flb_tp_thread, _head); |
150 | 0 | break; |
151 | 0 | } |
152 | 0 | th = NULL; |
153 | 0 | i++; |
154 | 0 | } |
155 | |
|
156 | 0 | if (!th) { |
157 | 0 | return -1; |
158 | 0 | } |
159 | | |
160 | 0 | return flb_tp_thread_start(tp, th); |
161 | 0 | } |
162 | | |
163 | | int flb_tp_thread_start_all(struct flb_tp *tp) |
164 | 212 | { |
165 | 212 | struct mk_list *head; |
166 | 212 | struct flb_tp_thread *th; |
167 | | |
168 | 327 | mk_list_foreach(head, &tp->list_threads) { |
169 | 327 | th = mk_list_entry(head, struct flb_tp_thread, _head); |
170 | 327 | flb_tp_thread_start(tp, th); |
171 | 327 | } |
172 | | |
173 | 212 | return 0; |
174 | 212 | } |
175 | | |
176 | | int flb_tp_thread_stop(struct flb_tp *tp, struct flb_tp_thread *th) |
177 | 0 | { |
178 | 0 | return 0; |
179 | 0 | } |
180 | | |
181 | | int flb_tp_thread_stop_all(struct flb_tp *tp) |
182 | 0 | { |
183 | 0 | int ret; |
184 | 0 | struct mk_list *head; |
185 | 0 | struct flb_tp_thread *th; |
186 | | |
187 | | /* |
188 | | * Iterate each worker thread, signal them to stop working |
189 | | * and wait a proper exit. |
190 | | */ |
191 | 0 | mk_list_foreach(head, &tp->list_threads) { |
192 | 0 | th = mk_list_entry(head, struct flb_tp_thread, _head); |
193 | 0 | if (th->status != FLB_THREAD_POOL_RUNNING) { |
194 | 0 | continue; |
195 | 0 | } |
196 | | |
197 | 0 | ret = flb_tp_thread_stop(tp, th); |
198 | 0 | if (ret == -1) { |
199 | |
|
200 | 0 | } |
201 | 0 | } |
202 | |
|
203 | 0 | return 0; |
204 | 0 | } |
205 | | |
206 | | int flb_tp_thread_destroy() |
207 | 0 | { |
208 | 0 | return 0; |
209 | 0 | } |