Coverage Report

Created: 2026-05-30 06:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/curl/lib/thrdqueue.c
Line
Count
Source
1
/***************************************************************************
2
 *                                  _   _ ____  _
3
 *  Project                     ___| | | |  _ \| |
4
 *                             / __| | | | |_) | |
5
 *                            | (__| |_| |  _ <| |___
6
 *                             \___|\___/|_| \_\_____|
7
 *
8
 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
9
 *
10
 * This software is licensed as described in the file COPYING, which
11
 * you should have received as part of this distribution. The terms
12
 * are also available at https://curl.se/docs/copyright.html.
13
 *
14
 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
15
 * copies of the Software, and permit persons to whom the Software is
16
 * furnished to do so, under the terms of the COPYING file.
17
 *
18
 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
19
 * KIND, either express or implied.
20
 *
21
 * SPDX-License-Identifier: curl
22
 *
23
 ***************************************************************************/
24
#include "curl_setup.h"
25
26
#ifdef USE_THREADS
27
28
#include "llist.h"
29
#include "curl_threads.h"
30
#include "thrdpool.h"
31
#include "thrdqueue.h"
32
#include "curlx/timeval.h"
33
#ifdef CURLVERBOSE
34
#include "curl_trc.h"
35
#include "urldata.h"
36
#endif
37
38
39
struct curl_thrdq {
40
  char *name;
41
  curl_mutex_t lock;
42
  curl_cond_t await;
43
  struct Curl_llist sendq;
44
  struct Curl_llist recvq;
45
  struct curl_thrdpool *tpool;
46
  Curl_thrdq_item_free_cb *fn_free;
47
  Curl_thrdq_item_process_cb *fn_process;
48
  Curl_thrdq_ev_cb *fn_event;
49
  void *fn_user_data;
50
  uint32_t send_max_len;
51
  BIT(aborted);
52
};
53
54
struct thrdq_item {
55
  struct Curl_llist_node node;
56
  Curl_thrdq_item_free_cb *fn_free;
57
  Curl_thrdq_item_process_cb *fn_process;
58
  void *item;
59
  struct curltime start;
60
  timediff_t timeout_ms;
61
  const char *description;
62
};
63
64
static struct thrdq_item *thrdq_item_create(struct curl_thrdq *tqueue,
65
                                            void *item,
66
                                            const char *description,
67
                                            timediff_t timeout_ms)
68
0
{
69
0
  struct thrdq_item *qitem;
70
71
0
  qitem = curlx_calloc(1, sizeof(*qitem));
72
0
  if(!qitem)
73
0
    return NULL;
74
0
  qitem->item = item;
75
0
  qitem->description = description;
76
0
  qitem->fn_free = tqueue->fn_free;
77
0
  qitem->fn_process = tqueue->fn_process;
78
0
  if(timeout_ms) {
79
0
    qitem->start = curlx_now();
80
0
    qitem->timeout_ms = timeout_ms;
81
0
  }
82
0
  return qitem;
83
0
}
84
85
static void thrdq_item_destroy(struct thrdq_item *qitem)
86
0
{
87
0
  if(qitem->item)
88
0
    qitem->fn_free(qitem->item);
89
0
  curlx_free(qitem);
90
0
}
91
92
static void thrdq_item_list_dtor(void *user_data, void *elem)
93
0
{
94
0
  (void)user_data;
95
0
  thrdq_item_destroy(elem);
96
0
}
97
98
static void *thrdq_tpool_take(void *user_data, const char **pdescription,
99
                              timediff_t *ptimeout_ms)
100
0
{
101
0
  struct curl_thrdq *tqueue = user_data;
102
0
  struct thrdq_item *qitem = NULL;
103
0
  struct Curl_llist_node *e;
104
0
  Curl_thrdq_ev_cb *fn_event = NULL;
105
0
  void *fn_user_data = NULL;
106
107
0
  Curl_mutex_acquire(&tqueue->lock);
108
0
  *pdescription = NULL;
109
0
  *ptimeout_ms = 0;
110
0
  if(!tqueue->aborted) {
111
0
    e = Curl_llist_head(&tqueue->sendq);
112
0
    if(e) {
113
0
      struct curltime now = curlx_now();
114
0
      timediff_t timeout_ms;
115
0
      while(e) {
116
0
        qitem = Curl_node_take_elem(e);
117
0
        timeout_ms = (!qitem->timeout_ms) ? 0 :
118
0
          (qitem->timeout_ms - curlx_ptimediff_ms(&now, &qitem->start));
119
0
        if(timeout_ms < 0) {
120
          /* timed out while queued, place on receive queue */
121
0
          Curl_llist_append(&tqueue->recvq, qitem, &qitem->node);
122
0
          fn_event = tqueue->fn_event;
123
0
          fn_user_data = tqueue->fn_user_data;
124
0
          qitem = NULL;
125
0
          e = Curl_llist_head(&tqueue->sendq);
126
0
          continue;
127
0
        }
128
0
        else {
129
0
          *pdescription = qitem->description;
130
0
          *ptimeout_ms = timeout_ms;
131
0
          break;
132
0
        }
133
0
      }
134
0
    }
135
0
  }
136
0
  Curl_mutex_release(&tqueue->lock);
137
  /* avoiding deadlocks */
138
0
  if(fn_event)
139
0
    fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data);
140
0
  return qitem;
141
0
}
142
143
static void thrdq_tpool_return(void *item, void *user_data)
144
0
{
145
0
  struct curl_thrdq *tqueue = user_data;
146
0
  struct thrdq_item *qitem = item;
147
0
  Curl_thrdq_ev_cb *fn_event = NULL;
148
0
  void *fn_user_data = NULL;
149
150
0
  if(!tqueue) {
151
0
    thrdq_item_destroy(item);
152
0
    return;
153
0
  }
154
155
0
  Curl_mutex_acquire(&tqueue->lock);
156
0
  if(tqueue->aborted) {
157
0
    thrdq_item_destroy(qitem);
158
0
  }
159
0
  else {
160
0
    DEBUGASSERT(!Curl_node_llist(&qitem->node));
161
0
    Curl_llist_append(&tqueue->recvq, qitem, &qitem->node);
162
0
    fn_event = tqueue->fn_event;
163
0
    fn_user_data = tqueue->fn_user_data;
164
0
  }
165
0
  Curl_mutex_release(&tqueue->lock);
166
  /* avoiding deadlocks */
167
0
  if(fn_event)
168
0
    fn_event(tqueue, CURL_THRDQ_EV_ITEM_DONE, fn_user_data);
169
0
}
170
171
static void thrdq_tpool_process(void *item)
172
0
{
173
0
  struct thrdq_item *qitem = item;
174
0
  qitem->fn_process(qitem->item);
175
0
}
176
177
static void thrdq_unlink(struct curl_thrdq *tqueue, bool locked, bool join)
178
0
{
179
0
  DEBUGASSERT(tqueue->aborted);
180
0
  if(tqueue->tpool) {
181
0
    if(locked)
182
0
      Curl_mutex_release(&tqueue->lock);
183
0
    Curl_thrdpool_destroy(tqueue->tpool, join);
184
0
    tqueue->tpool = NULL;
185
0
    if(locked)
186
0
      Curl_mutex_acquire(&tqueue->lock);
187
0
  }
188
189
0
  Curl_llist_destroy(&tqueue->sendq, NULL);
190
0
  Curl_llist_destroy(&tqueue->recvq, NULL);
191
0
  curlx_free(tqueue->name);
192
0
  Curl_cond_destroy(&tqueue->await);
193
0
  if(locked)
194
0
    Curl_mutex_release(&tqueue->lock);
195
0
  Curl_mutex_destroy(&tqueue->lock);
196
0
  curlx_free(tqueue);
197
0
}
198
199
CURLcode Curl_thrdq_create(struct curl_thrdq **ptqueue,
200
                           const char *name,
201
                           uint32_t max_len,
202
                           uint32_t min_threads,
203
                           uint32_t max_threads,
204
                           uint32_t idle_time_ms,
205
                           Curl_thrdq_item_free_cb *fn_free,
206
                           Curl_thrdq_item_process_cb *fn_process,
207
                           Curl_thrdq_ev_cb *fn_event,
208
                           void *user_data)
209
0
{
210
0
  struct curl_thrdq *tqueue;
211
0
  CURLcode result = CURLE_OUT_OF_MEMORY;
212
213
0
  tqueue = curlx_calloc(1, sizeof(*tqueue));
214
0
  if(!tqueue)
215
0
    goto out;
216
217
0
  Curl_mutex_init(&tqueue->lock);
218
0
  Curl_cond_init(&tqueue->await);
219
0
  Curl_llist_init(&tqueue->sendq, thrdq_item_list_dtor);
220
0
  Curl_llist_init(&tqueue->recvq, thrdq_item_list_dtor);
221
0
  tqueue->fn_free = fn_free;
222
0
  tqueue->fn_process = fn_process;
223
0
  tqueue->fn_event = fn_event;
224
0
  tqueue->fn_user_data = user_data;
225
0
  tqueue->send_max_len = max_len;
226
227
0
  tqueue->name = curlx_strdup(name);
228
0
  if(!tqueue->name)
229
0
    goto out;
230
231
0
  result = Curl_thrdpool_create(&tqueue->tpool, name,
232
0
                                min_threads, max_threads, idle_time_ms,
233
0
                                thrdq_tpool_take,
234
0
                                thrdq_tpool_process,
235
0
                                thrdq_tpool_return,
236
0
                                tqueue);
237
238
0
out:
239
0
  if(result && tqueue) {
240
0
    tqueue->aborted = TRUE;
241
0
    thrdq_unlink(tqueue, FALSE, TRUE);
242
0
    tqueue = NULL;
243
0
  }
244
0
  *ptqueue = tqueue;
245
0
  return result;
246
0
}
247
248
void Curl_thrdq_destroy(struct curl_thrdq *tqueue, bool join)
249
0
{
250
0
  Curl_mutex_acquire(&tqueue->lock);
251
0
  DEBUGASSERT(!tqueue->aborted);
252
0
  tqueue->aborted = TRUE;
253
0
  thrdq_unlink(tqueue, TRUE, join);
254
0
}
255
256
CURLcode Curl_thrdq_send(struct curl_thrdq *tqueue, void *item,
257
                         const char *description, timediff_t timeout_ms)
258
0
{
259
0
  CURLcode result = CURLE_AGAIN;
260
0
  size_t signals = 0;
261
262
0
  Curl_mutex_acquire(&tqueue->lock);
263
0
  if(tqueue->aborted) {
264
0
    DEBUGASSERT(0);
265
0
    result = CURLE_SEND_ERROR;
266
0
    goto out;
267
0
  }
268
0
  if(timeout_ms < 0) {
269
0
    result = CURLE_OPERATION_TIMEDOUT;
270
0
    goto out;
271
0
  }
272
273
0
  if(!tqueue->send_max_len ||
274
0
     (Curl_llist_count(&tqueue->sendq) < tqueue->send_max_len)) {
275
0
    struct thrdq_item *qitem = thrdq_item_create(tqueue, item, description,
276
0
                                                 timeout_ms);
277
0
    if(!qitem) {
278
0
      result = CURLE_OUT_OF_MEMORY;
279
0
      goto out;
280
0
    }
281
0
    item = NULL;
282
0
    Curl_llist_append(&tqueue->sendq, qitem, &qitem->node);
283
0
    signals = Curl_llist_count(&tqueue->sendq);
284
0
    result = CURLE_OK;
285
0
  }
286
287
0
out:
288
0
  Curl_mutex_release(&tqueue->lock);
289
  /* Signal thread pool unlocked to avoid deadlocks. Since we added
290
   * item to the queue already, it might have been taken for processing
291
   * already. Any error in signalling the pool cannot be reported to
292
   * the caller since it needs to give up ownership of item. */
293
0
  if(!result && signals)
294
0
    (void)Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals);
295
0
  return result;
296
0
}
297
298
CURLcode Curl_thrdq_recv(struct curl_thrdq *tqueue, void **pitem)
299
0
{
300
0
  CURLcode result = CURLE_AGAIN;
301
0
  struct Curl_llist_node *e;
302
303
0
  *pitem = NULL;
304
0
  Curl_mutex_acquire(&tqueue->lock);
305
0
  if(tqueue->aborted) {
306
0
    DEBUGASSERT(0);
307
0
    result = CURLE_RECV_ERROR;
308
0
    goto out;
309
0
  }
310
311
0
  e = Curl_llist_head(&tqueue->recvq);
312
0
  if(e) {
313
0
    struct thrdq_item *qitem = Curl_node_take_elem(e);
314
0
    *pitem = qitem->item;
315
0
    qitem->item = NULL;
316
0
    thrdq_item_destroy(qitem);
317
0
    result = CURLE_OK;
318
0
  }
319
0
out:
320
0
  Curl_mutex_release(&tqueue->lock);
321
0
  return result;
322
0
}
323
324
static void thrdq_llist_clean_matches(struct Curl_llist *llist,
325
                                      Curl_thrdq_item_match_cb *fn_match,
326
                                      void *match_data)
327
0
{
328
0
  struct Curl_llist_node *e, *n;
329
0
  struct thrdq_item *qitem;
330
331
0
  for(e = Curl_llist_head(llist); e; e = n) {
332
0
    n = Curl_node_next(e);
333
0
    qitem = Curl_node_elem(e);
334
0
    if(fn_match(qitem->item, match_data))
335
0
      Curl_node_remove(e);
336
0
  }
337
0
}
338
339
void Curl_thrdq_clear(struct curl_thrdq *tqueue,
340
                      Curl_thrdq_item_match_cb *fn_match,
341
                      void *match_data)
342
0
{
343
0
  Curl_mutex_acquire(&tqueue->lock);
344
0
  if(tqueue->aborted) {
345
0
    DEBUGASSERT(0);
346
0
    goto out;
347
0
  }
348
0
  thrdq_llist_clean_matches(&tqueue->sendq, fn_match, match_data);
349
0
  thrdq_llist_clean_matches(&tqueue->recvq, fn_match, match_data);
350
0
out:
351
0
  Curl_mutex_release(&tqueue->lock);
352
0
}
353
354
#ifdef UNITTESTS
355
/* @unittest 3301 */
356
UNITTEST CURLcode thrdq_await_done(struct curl_thrdq *tqueue,
357
                                   uint32_t timeout_ms);
358
UNITTEST CURLcode thrdq_await_done(struct curl_thrdq *tqueue,
359
                                   uint32_t timeout_ms)
360
0
{
361
0
  return Curl_thrdpool_await_idle(tqueue->tpool, timeout_ms);
362
0
}
363
#endif
364
365
CURLcode Curl_thrdq_set_props(struct curl_thrdq *tqueue,
366
                              uint32_t max_len,
367
                              uint32_t min_threads,
368
                              uint32_t max_threads,
369
                              uint32_t idle_time_ms)
370
0
{
371
0
  CURLcode result;
372
0
  size_t signals;
373
374
0
  Curl_mutex_acquire(&tqueue->lock);
375
0
  tqueue->send_max_len = max_len;
376
0
  signals = Curl_llist_count(&tqueue->sendq);
377
0
  Curl_mutex_release(&tqueue->lock);
378
379
0
  result = Curl_thrdpool_set_props(tqueue->tpool, min_threads,
380
0
                                   max_threads, idle_time_ms);
381
0
  if(!result && signals)
382
0
    result = Curl_thrdpool_signal(tqueue->tpool, (uint32_t)signals);
383
0
  return result;
384
0
}
385
386
#ifdef CURLVERBOSE
387
void Curl_thrdq_trace(struct curl_thrdq *tqueue,
388
                      struct Curl_easy *data)
389
0
{
390
0
  struct curl_trc_feat *feat = &Curl_trc_feat_threads;
391
0
  if(Curl_trc_ft_is_verbose(data, feat)) {
392
0
    struct Curl_llist_node *e;
393
0
    struct thrdq_item *qitem;
394
395
0
    Curl_thrdpool_trace(tqueue->tpool, data);
396
0
    Curl_mutex_acquire(&tqueue->lock);
397
0
    if(!Curl_llist_count(&tqueue->sendq) &&
398
0
       !Curl_llist_count(&tqueue->recvq)) {
399
0
      Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] empty", tqueue->name);
400
0
    }
401
0
    for(e = Curl_llist_head(&tqueue->sendq); e; e = Curl_node_next(e)) {
402
0
      qitem = Curl_node_elem(e);
403
0
      Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] in: %s",
404
0
                          tqueue->name, qitem->description);
405
0
    }
406
0
    for(e = Curl_llist_head(&tqueue->recvq); e; e = Curl_node_next(e)) {
407
0
      qitem = Curl_node_elem(e);
408
0
      Curl_trc_feat_infof(data, feat, "[TQUEUE-%s] out: %s",
409
0
                          tqueue->name, qitem->description);
410
0
    }
411
0
    Curl_mutex_release(&tqueue->lock);
412
0
  }
413
0
}
414
#endif
415
416
#endif /* USE_THREADS */