Coverage Report

Created: 2026-01-16 06:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/samba/lib/tevent/tevent_queue.c
Line
Count
Source
1
/*
2
   Unix SMB/CIFS implementation.
3
   Infrastructure for async requests
4
   Copyright (C) Volker Lendecke 2008
5
   Copyright (C) Stefan Metzmacher 2009
6
7
     ** NOTE! The following LGPL license applies to the tevent
8
     ** library. This does NOT imply that all of Samba is released
9
     ** under the LGPL
10
11
   This library is free software; you can redistribute it and/or
12
   modify it under the terms of the GNU Lesser General Public
13
   License as published by the Free Software Foundation; either
14
   version 3 of the License, or (at your option) any later version.
15
16
   This library is distributed in the hope that it will be useful,
17
   but WITHOUT ANY WARRANTY; without even the implied warranty of
18
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19
   Lesser General Public License for more details.
20
21
   You should have received a copy of the GNU Lesser General Public
22
   License along with this library; if not, see <http://www.gnu.org/licenses/>.
23
*/
24
25
#include "replace.h"
26
#include "tevent.h"
27
#include "tevent_internal.h"
28
#include "tevent_util.h"
29
30
#undef tevent_queue_add
31
#undef tevent_queue_add_entry
32
#undef tevent_queue_add_optimize_empty
33
34
struct tevent_queue_entry {
35
  struct tevent_queue_entry *prev, *next;
36
  struct tevent_queue *queue;
37
38
  bool triggered;
39
40
  struct tevent_req *req;
41
  struct tevent_context *ev;
42
43
  tevent_queue_trigger_fn_t trigger;
44
  const char *trigger_name;
45
  void *private_data;
46
  uint64_t tag;
47
};
48
49
struct tevent_queue {
50
  const char *name;
51
  const char *location;
52
53
  bool running;
54
  struct tevent_immediate *immediate;
55
56
  size_t length;
57
  struct tevent_queue_entry *list;
58
};
59
60
static void tevent_queue_immediate_trigger(struct tevent_context *ev,
61
             struct tevent_immediate *im,
62
             void *private_data);
63
64
static int tevent_queue_entry_destructor(struct tevent_queue_entry *e)
65
0
{
66
0
  struct tevent_queue *q = e->queue;
67
68
0
  if (!q) {
69
0
    return 0;
70
0
  }
71
72
0
  tevent_trace_queue_callback(q->list->ev, e, TEVENT_EVENT_TRACE_DETACH);
73
0
  tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_LEAVE,
74
0
          q->list->req,
75
0
          q->list->req->internal.call_depth,
76
0
          e->trigger_name);
77
0
  DLIST_REMOVE(q->list, e);
78
0
  q->length--;
79
80
0
  if (!q->running) {
81
0
    return 0;
82
0
  }
83
84
0
  if (!q->list) {
85
0
    return 0;
86
0
  }
87
88
0
  if (q->list->triggered) {
89
0
    return 0;
90
0
  }
91
92
0
  tevent_schedule_immediate(q->immediate,
93
0
          q->list->ev,
94
0
          tevent_queue_immediate_trigger,
95
0
          q);
96
97
0
  return 0;
98
0
}
99
100
static int tevent_queue_destructor(struct tevent_queue *q)
101
0
{
102
0
  q->running = false;
103
104
0
  while (q->list) {
105
0
    struct tevent_queue_entry *e = q->list;
106
0
    talloc_free(e);
107
0
  }
108
109
0
  return 0;
110
0
}
111
112
struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx,
113
            const char *name,
114
            const char *location)
115
0
{
116
0
  struct tevent_queue *queue;
117
118
0
  queue = talloc_zero(mem_ctx, struct tevent_queue);
119
0
  if (!queue) {
120
0
    return NULL;
121
0
  }
122
123
0
  queue->name = talloc_strdup(queue, name);
124
0
  if (!queue->name) {
125
0
    talloc_free(queue);
126
0
    return NULL;
127
0
  }
128
0
  queue->immediate = tevent_create_immediate(queue);
129
0
  if (!queue->immediate) {
130
0
    talloc_free(queue);
131
0
    return NULL;
132
0
  }
133
134
0
  queue->location = location;
135
136
  /* queue is running by default */
137
0
  queue->running = true;
138
139
0
  talloc_set_destructor(queue, tevent_queue_destructor);
140
0
  return queue;
141
0
}
142
143
static void tevent_queue_immediate_trigger(struct tevent_context *ev,
144
             struct tevent_immediate *im,
145
             void *private_data)
146
0
{
147
0
  struct tevent_queue *q =
148
0
    talloc_get_type_abort(private_data,
149
0
    struct tevent_queue);
150
151
0
  if (!q->running) {
152
0
    return;
153
0
  }
154
155
0
  if (!q->list) {
156
0
    return;
157
0
  }
158
159
0
  tevent_trace_queue_callback(ev, q->list,
160
0
            TEVENT_EVENT_TRACE_BEFORE_HANDLER);
161
  /* Set the call depth of the request coming from the queue. */
162
0
  tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_TRIGGER,
163
0
          q->list->req,
164
0
          q->list->req->internal.call_depth,
165
0
          q->list->trigger_name);
166
0
  q->list->triggered = true;
167
0
  q->list->trigger(q->list->req, q->list->private_data);
168
0
}
169
170
static void tevent_queue_noop_trigger(struct tevent_req *req,
171
              void *_private_data)
172
0
{
173
  /* this is doing nothing but blocking the queue */
174
0
}
175
176
static struct tevent_queue_entry *tevent_queue_add_internal(
177
          struct tevent_queue *queue,
178
          struct tevent_context *ev,
179
          struct tevent_req *req,
180
          tevent_queue_trigger_fn_t trigger,
181
          const char *trigger_name,
182
          void *private_data,
183
          bool allow_direct)
184
0
{
185
0
  struct tevent_queue_entry *e;
186
187
0
  e = talloc_zero(req, struct tevent_queue_entry);
188
0
  if (e == NULL) {
189
0
    return NULL;
190
0
  }
191
192
  /*
193
   * if there is no trigger, it is just a blocker
194
   */
195
0
  if (trigger == NULL) {
196
0
    trigger = tevent_queue_noop_trigger;
197
0
  }
198
199
0
  e->queue = queue;
200
0
  e->req = req;
201
0
  e->ev = ev;
202
0
  e->trigger = trigger;
203
0
  e->trigger_name = trigger_name;
204
0
  e->private_data = private_data;
205
206
0
  if (queue->length > 0) {
207
    /*
208
     * if there are already entries in the
209
     * queue do not optimize.
210
     */
211
0
    allow_direct = false;
212
0
  }
213
214
0
  if (req->async.fn != NULL) {
215
    /*
216
     * If the caller wants to optimize for the
217
     * empty queue case, call the trigger only
218
     * if there is no callback defined for the
219
     * request yet.
220
     */
221
0
    allow_direct = false;
222
0
  }
223
224
0
  DLIST_ADD_END(queue->list, e);
225
0
  queue->length++;
226
0
  talloc_set_destructor(e, tevent_queue_entry_destructor);
227
0
  tevent_trace_queue_callback(ev, e, TEVENT_EVENT_TRACE_ATTACH);
228
0
  tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_ENTER,
229
0
          req,
230
0
          req->internal.call_depth,
231
0
          e->trigger_name);
232
233
0
  if (!queue->running) {
234
0
    return e;
235
0
  }
236
237
0
  if (queue->list->triggered) {
238
0
    return e;
239
0
  }
240
241
  /*
242
   * If allowed we directly call the trigger
243
   * avoiding possible delays caused by
244
   * an immediate event.
245
   */
246
0
  if (allow_direct) {
247
0
    tevent_trace_queue_callback(ev,
248
0
              queue->list,
249
0
              TEVENT_EVENT_TRACE_BEFORE_HANDLER);
250
0
    queue->list->triggered = true;
251
0
    queue->list->trigger(queue->list->req,
252
0
             queue->list->private_data);
253
0
    return e;
254
0
  }
255
256
0
  tevent_schedule_immediate(queue->immediate,
257
0
          queue->list->ev,
258
0
          tevent_queue_immediate_trigger,
259
0
          queue);
260
261
0
  return e;
262
0
}
263
264
bool tevent_queue_add(struct tevent_queue *queue,
265
          struct tevent_context *ev,
266
          struct tevent_req *req,
267
          tevent_queue_trigger_fn_t trigger,
268
          void *private_data)
269
0
{
270
0
  return _tevent_queue_add(queue, ev, req, trigger, NULL, private_data);
271
0
}
272
273
bool _tevent_queue_add(struct tevent_queue *queue,
274
          struct tevent_context *ev,
275
          struct tevent_req *req,
276
          tevent_queue_trigger_fn_t trigger,
277
          const char* trigger_name,
278
          void *private_data)
279
0
{
280
0
  struct tevent_queue_entry *e;
281
282
0
  e = tevent_queue_add_internal(queue, ev, req,
283
0
              trigger, trigger_name,
284
0
              private_data, false);
285
0
  if (e == NULL) {
286
0
    return false;
287
0
  }
288
289
0
  return true;
290
0
}
291
292
struct tevent_queue_entry *tevent_queue_add_entry(
293
          struct tevent_queue *queue,
294
          struct tevent_context *ev,
295
          struct tevent_req *req,
296
          tevent_queue_trigger_fn_t trigger,
297
          void *private_data)
298
0
{
299
0
  return _tevent_queue_add_entry(queue, ev, req,
300
0
               trigger, NULL,
301
0
               private_data);
302
0
}
303
304
struct tevent_queue_entry *_tevent_queue_add_entry(
305
          struct tevent_queue *queue,
306
          struct tevent_context *ev,
307
          struct tevent_req *req,
308
          tevent_queue_trigger_fn_t trigger,
309
          const char* trigger_name,
310
          void *private_data)
311
0
{
312
0
  return tevent_queue_add_internal(queue, ev, req,
313
0
           trigger, trigger_name,
314
0
           private_data, false);
315
0
}
316
317
struct tevent_queue_entry *tevent_queue_add_optimize_empty(
318
          struct tevent_queue *queue,
319
          struct tevent_context *ev,
320
          struct tevent_req *req,
321
          tevent_queue_trigger_fn_t trigger,
322
          void *private_data)
323
0
{
324
0
  return _tevent_queue_add_optimize_empty(queue, ev, req,
325
0
            trigger, NULL,
326
0
            private_data);
327
0
}
328
329
struct tevent_queue_entry *_tevent_queue_add_optimize_empty(
330
          struct tevent_queue *queue,
331
          struct tevent_context *ev,
332
          struct tevent_req *req,
333
          tevent_queue_trigger_fn_t trigger,
334
          const char* trigger_name,
335
          void *private_data)
336
0
{
337
0
  return tevent_queue_add_internal(queue, ev, req,
338
0
           trigger, trigger_name,
339
0
           private_data, true);
340
0
}
341
342
void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry)
343
0
{
344
0
  if (entry->queue->running) {
345
0
    abort();
346
0
  }
347
348
0
  if (entry->queue->list != entry) {
349
0
    abort();
350
0
  }
351
352
0
  entry->triggered = false;
353
0
}
354
355
void tevent_queue_start(struct tevent_queue *queue)
356
0
{
357
0
  if (queue->running) {
358
    /* already started */
359
0
    return;
360
0
  }
361
362
0
  queue->running = true;
363
364
0
  if (!queue->list) {
365
0
    return;
366
0
  }
367
368
0
  if (queue->list->triggered) {
369
0
    return;
370
0
  }
371
372
0
  tevent_schedule_immediate(queue->immediate,
373
0
          queue->list->ev,
374
0
          tevent_queue_immediate_trigger,
375
0
          queue);
376
0
}
377
378
void tevent_queue_stop(struct tevent_queue *queue)
379
0
{
380
0
  queue->running = false;
381
0
}
382
383
size_t tevent_queue_length(struct tevent_queue *queue)
384
0
{
385
0
  return queue->length;
386
0
}
387
388
bool tevent_queue_running(struct tevent_queue *queue)
389
0
{
390
0
  return queue->running;
391
0
}
392
393
struct tevent_queue_wait_state {
394
  uint8_t dummy;
395
};
396
397
static void tevent_queue_wait_trigger(struct tevent_req *req,
398
              void *private_data);
399
400
struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx,
401
            struct tevent_context *ev,
402
            struct tevent_queue *queue)
403
0
{
404
0
  struct tevent_req *req;
405
0
  struct tevent_queue_wait_state *state;
406
0
  bool ok;
407
408
0
  req = tevent_req_create(mem_ctx, &state,
409
0
        struct tevent_queue_wait_state);
410
0
  if (req == NULL) {
411
0
    return NULL;
412
0
  }
413
414
0
  ok = _tevent_queue_add(queue, ev, req,
415
0
            tevent_queue_wait_trigger,
416
0
            "tevent_queue_wait_trigger",
417
0
            NULL);
418
0
  if (!ok) {
419
0
    tevent_req_oom(req);
420
0
    return tevent_req_post(req, ev);
421
0
  }
422
423
0
  return req;
424
0
}
425
426
static void tevent_queue_wait_trigger(struct tevent_req *req,
427
           void *private_data)
428
0
{
429
0
  tevent_req_done(req);
430
0
}
431
432
bool tevent_queue_wait_recv(struct tevent_req *req)
433
0
{
434
0
  enum tevent_req_state state;
435
0
  uint64_t err;
436
437
0
  if (tevent_req_is_error(req, &state, &err)) {
438
0
    tevent_req_received(req);
439
0
    return false;
440
0
  }
441
442
0
  tevent_req_received(req);
443
0
  return true;
444
0
}
445
446
void tevent_queue_entry_set_tag(struct tevent_queue_entry *qe, uint64_t tag)
447
0
{
448
0
  if (qe == NULL) {
449
0
    return;
450
0
  }
451
452
0
  qe->tag = tag;
453
0
}
454
455
uint64_t tevent_queue_entry_get_tag(const struct tevent_queue_entry *qe)
456
0
{
457
0
  if (qe == NULL) {
458
0
    return 0;
459
0
  }
460
461
0
  return qe->tag;
462
0
}