Coverage Report

Created: 2026-03-09 07:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/fluent-bit/src/flb_thread_pool.c
Line
Count
Source
1
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3
/*  Fluent Bit
4
 *  ==========
5
 *  Copyright (C) 2015-2026 The Fluent Bit Authors
6
 *
7
 *  Licensed under the Apache License, Version 2.0 (the "License");
8
 *  you may not use this file except in compliance with the License.
9
 *  You may obtain a copy of the License at
10
 *
11
 *      http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 *  Unless required by applicable law or agreed to in writing, software
14
 *  distributed under the License is distributed on an "AS IS" BASIS,
15
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
 *  See the License for the specific language governing permissions and
17
 *  limitations under the License.
18
 */
19
20
#include <fluent-bit/flb_info.h>
21
#include <fluent-bit/flb_mem.h>
22
#include <fluent-bit/flb_log.h>
23
#include <fluent-bit/flb_worker.h>
24
#include <fluent-bit/flb_thread_pool.h>
25
26
/* Return the next thread id. We use the list size to set an id */
27
static int flb_tp_thread_get_id(struct flb_tp *tp)
28
327
{
29
327
    return mk_list_size(&tp->list_threads);
30
327
}
31
32
/* Create a thread manager context */
33
struct flb_tp *flb_tp_create(struct flb_config *config)
34
212
{
35
212
    struct flb_tp *tp;
36
37
212
    tp = flb_calloc(1, sizeof(struct flb_tp));
38
212
    if (!tp) {
39
0
        flb_errno();
40
0
        return NULL;
41
0
    }
42
212
    tp->config = config;
43
212
    mk_list_init(&tp->list_threads);
44
45
212
    return tp;
46
212
}
47
48
void flb_tp_destroy(struct flb_tp *tp)
49
212
{
50
212
    struct mk_list *tmp;
51
212
    struct mk_list *head;
52
212
    struct flb_tp_thread *th;
53
54
327
    mk_list_foreach_safe(head, tmp, &tp->list_threads) {
55
327
        th = mk_list_entry(head, struct flb_tp_thread, _head);
56
327
        mk_list_del(&th->_head);
57
327
        flb_free(th);
58
327
    }
59
60
212
    flb_free(tp);
61
212
}
62
63
struct flb_tp_thread *flb_tp_thread_create(struct flb_tp *tp,
64
                                           void (*func)(void *), void *arg,
65
                                           struct flb_config *config)
66
67
327
{
68
327
    struct flb_tp_thread *th;
69
70
    /* Create thread context */
71
327
    th = flb_calloc(1, sizeof(struct flb_tp_thread));
72
327
    if (!th) {
73
0
        flb_errno();
74
0
        return NULL;
75
0
    }
76
327
    th->config = config;
77
78
    /*
79
     * To spawn a thread, we use the 'worker' interface. Since the worker will
80
     * start the thread as soon as is invoked, we keep a reference to the worker
81
     * parameters in our context and we only use them when the thread is really
82
     * started through the call flb_tp_thread_start().
83
     */
84
327
    th->params.func = func;
85
327
    th->params.data = arg;
86
87
    /* Status */
88
327
    th->status = FLB_THREAD_POOL_NONE;
89
90
    /* Set the thread id */
91
327
    th->id = flb_tp_thread_get_id(tp);
92
93
    /* Link this thread context to the parent context list */
94
327
    mk_list_add(&th->_head, &tp->list_threads);
95
96
327
    return th;
97
327
}
98
99
100
/* Get a candidate thread using round-robin */
101
struct flb_tp_thread *flb_tp_thread_get_rr(struct flb_tp *tp)
102
21
{
103
21
    struct flb_tp_thread *th;
104
105
21
    if (!tp->thread_cur) {
106
21
        th = mk_list_entry_first(&tp->list_threads,
107
21
                                 struct flb_tp_thread, _head);
108
21
    }
109
0
    else {
110
0
        th = mk_list_entry_next(tp->thread_cur,
111
0
                                struct flb_tp_thread, _head,
112
0
                                &tp->list_threads);
113
0
    }
114
21
    tp->thread_cur = &th->_head;
115
116
21
    return th;
117
21
}
118
119
int flb_tp_thread_start(struct flb_tp *tp, struct flb_tp_thread *th)
120
327
{
121
327
    int ret;
122
123
327
    ret = flb_worker_create(th->params.func, th->params.data, &th->tid,
124
327
                            th->config);
125
327
    if (ret == -1) {
126
0
        th->status = FLB_THREAD_POOL_ERROR;
127
0
        return -1;
128
0
    }
129
130
    /*
131
     * Retrieve the Worker context. The worker API don't return the
132
     * id or the context, so we use the created pthread_t (task id)
133
     * to obtain the reference.
134
     */
135
327
    th->worker = flb_worker_lookup(th->tid, tp->config);
136
327
    th->status = FLB_THREAD_POOL_RUNNING;
137
138
327
    return 0;
139
327
}
140
141
int flb_tp_thread_start_id(struct flb_tp *tp, int id)
142
0
{
143
0
    int i = 0;
144
0
    struct mk_list *head;
145
0
    struct flb_tp_thread *th = NULL;
146
147
0
    mk_list_foreach(head, &tp->list_threads) {
148
0
        if (i == id) {
149
0
            th = mk_list_entry(head, struct flb_tp_thread, _head);
150
0
            break;
151
0
        }
152
0
        th = NULL;
153
0
        i++;
154
0
    }
155
156
0
    if (!th) {
157
0
        return -1;
158
0
    }
159
160
0
    return flb_tp_thread_start(tp, th);
161
0
}
162
163
int flb_tp_thread_start_all(struct flb_tp *tp)
164
212
{
165
212
    struct mk_list *head;
166
212
    struct flb_tp_thread *th;
167
168
327
    mk_list_foreach(head, &tp->list_threads) {
169
327
        th = mk_list_entry(head, struct flb_tp_thread, _head);
170
327
        flb_tp_thread_start(tp, th);
171
327
    }
172
173
212
    return 0;
174
212
}
175
176
int flb_tp_thread_stop(struct flb_tp *tp, struct flb_tp_thread *th)
177
0
{
178
0
    return 0;
179
0
}
180
181
int flb_tp_thread_stop_all(struct flb_tp *tp)
182
0
{
183
0
    int ret;
184
0
    struct mk_list *head;
185
0
    struct flb_tp_thread *th;
186
187
    /*
188
     * Iterate each worker thread, signal them to stop working
189
     * and wait a proper exit.
190
     */
191
0
    mk_list_foreach(head, &tp->list_threads) {
192
0
        th = mk_list_entry(head, struct flb_tp_thread, _head);
193
0
        if (th->status != FLB_THREAD_POOL_RUNNING) {
194
0
            continue;
195
0
        }
196
197
0
        ret = flb_tp_thread_stop(tp, th);
198
0
        if (ret == -1) {
199
200
0
        }
201
0
    }
202
203
0
    return 0;
204
0
}
205
206
int flb_tp_thread_destroy()
207
0
{
208
0
    return 0;
209
0
}