Coverage Report

Created: 2025-01-28 06:45

/src/tarantool/third_party/libeio/etp.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * libetp implementation
3
 *
4
 * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann <libetp@schmorp.de>
5
 * All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or without modifica-
8
 * tion, are permitted provided that the following conditions are met:
9
 *
10
 *   1.  Redistributions of source code must retain the above copyright notice,
11
 *       this list of conditions and the following disclaimer.
12
 *
13
 *   2.  Redistributions in binary form must reproduce the above copyright
14
 *       notice, this list of conditions and the following disclaimer in the
15
 *       documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19
 * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO
20
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21
 * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25
 * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26
 * OF THE POSSIBILITY OF SUCH DAMAGE.
27
 *
28
 * Alternatively, the contents of this file may be used under the terms of
29
 * the GNU General Public License ("GPL") version 2 or any later version,
30
 * in which case the provisions of the GPL are applicable instead of
31
 * the above. If you wish to allow the use of your version of this file
32
 * only under the terms of the GPL and not to allow others to use your
33
 * version of this file under the BSD license, indicate your decision
34
 * by deleting the provisions above and replace them with the notice
35
 * and other provisions required by the GPL. If you do not delete the
36
 * provisions above, a recipient may use your version of this file under
37
 * either the BSD or the GPL.
38
 */
39
40
#ifndef ETP_API_DECL
41
# define ETP_API_DECL static
42
#endif
43
44
#ifndef ETP_PRI_MIN
45
# define ETP_PRI_MIN 0
46
# define ETP_PRI_MAX 0
47
#endif
48
49
#ifndef ETP_TYPE_QUIT
50
# define ETP_TYPE_QUIT 0
51
#endif
52
53
#ifndef ETP_TYPE_GROUP
54
# define ETP_TYPE_GROUP 1
55
#endif
56
57
#ifndef ETP_CB
58
typedef void (*ETP_CB) (void *);
59
# define ETP_CB ETP_CB
60
#endif
61
#ifndef ETP_WANT_POLL
62
0
# define ETP_WANT_POLL(user) user->want_poll_cb (user->userdata)
63
#endif
64
#ifndef ETP_DONE_POLL
65
0
# define ETP_DONE_POLL(user) user->done_poll_cb (user->userdata)
66
#endif
67
68
0
#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
69
70
0
#define ETP_TICKS ((1000000 + 1023) >> 10)
71
72
enum {
73
  ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */
74
  ETP_FLAG_DELAYED  = 0x08, /* groiup request has been delayed */
75
};
76
77
/* calculate time difference in ~1/ETP_TICKS of a second */
78
ecb_inline int
79
etp_tvdiff (struct timeval *tv1, struct timeval *tv2)
80
0
{
81
0
  return  (tv2->tv_sec  - tv1->tv_sec ) * ETP_TICKS
82
0
       + ((tv2->tv_usec - tv1->tv_usec) >> 10);
83
0
}
84
85
struct etp_tmpbuf
86
{
87
  void *ptr;
88
  int len;
89
};
90
91
static void *
92
etp_tmpbuf_get (struct etp_tmpbuf *buf, int len)
93
0
{
94
0
  if (buf->len < len)
95
0
    {
96
0
      free (buf->ptr);
97
0
      buf->ptr = malloc (buf->len = len);
98
0
    }
99
100
0
  return buf->ptr;
101
0
}
102
103
/*
104
 * a somewhat faster data structure might be nice, but
105
 * with 8 priorities this actually needs <20 insns
106
 * per shift, the most expensive operation.
107
 */
108
typedef struct
109
{
110
  ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
111
  int size;
112
} etp_reqq;
113
114
typedef struct etp_pool *etp_pool;
115
typedef struct etp_pool_user *etp_pool_user;
116
117
typedef struct etp_worker
118
{
119
  etp_pool pool;
120
121
  struct etp_tmpbuf tmpbuf;
122
123
#ifdef ETP_WORKER_COMMON
124
  ETP_WORKER_COMMON
125
#endif
126
} etp_worker;
127
128
struct etp_pool
129
{
130
   etp_reqq req_queue;
131
132
   unsigned int started, idle, wanted;
133
134
   unsigned int nreqs_run;    /* pool->lock */
135
   unsigned int max_idle;     /* maximum number of threads that can pool->idle indefinitely */
136
   unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */
137
138
   xmutex_t lock;
139
   xcond_t  reqwait;
140
   xcond_t  wrkwait;
141
142
   int (*on_start_cb)(void *data);
143
   void *on_start_data;
144
   int (*on_stop_cb)(void *data);
145
   void *on_stop_data;
146
};
147
148
struct etp_pool_user
149
{
150
   etp_pool pool;
151
152
   void *userdata;
153
154
   etp_reqq res_queue;
155
156
   unsigned int max_poll_time;
157
   unsigned int max_poll_reqs;
158
159
   unsigned int nreqs;
160
161
   ETP_CB want_poll_cb;
162
   ETP_CB done_poll_cb;
163
164
   xmutex_t lock;
165
};
166
167
/* worker threads management */
168
169
static void ecb_cold
170
etp_worker_free (etp_worker *wrk)
171
0
{
172
0
  free (wrk->tmpbuf.ptr);
173
0
  free (wrk);
174
0
}
175
176
ETP_API_DECL unsigned int
177
etp_nreqs (etp_pool_user user)
178
0
{
179
0
  return user->nreqs;
180
0
}
181
182
ETP_API_DECL unsigned int
183
etp_npending (etp_pool_user user)
184
0
{
185
0
  unsigned int retval;
186
187
0
  if (WORDACCESS_UNSAFE) X_LOCK   (user->lock);
188
0
  retval = user->res_queue.size;
189
0
  if (WORDACCESS_UNSAFE) X_UNLOCK (user->lock);
190
191
0
  return retval;
192
0
}
193
194
ETP_API_DECL unsigned int
195
etp_nthreads (etp_pool pool)
196
0
{
197
0
  unsigned int retval;
198
199
0
  if (WORDACCESS_UNSAFE) X_LOCK   (pool->lock);
200
0
  retval = pool->started;
201
0
  if (WORDACCESS_UNSAFE) X_UNLOCK (pool->lock);
202
203
0
  return retval;
204
0
}
205
206
static void ecb_noinline ecb_cold
207
reqq_init (etp_reqq *q)
208
0
{
209
0
  int pri;
210
211
0
  for (pri = 0; pri < ETP_NUM_PRI; ++pri)
212
0
    q->qs[pri] = q->qe[pri] = 0;
213
214
0
  q->size = 0;
215
0
}
216
217
static int ecb_noinline
218
reqq_push (etp_reqq *q, ETP_REQ *req)
219
0
{
220
0
  int pri = req->pri;
221
0
  req->next = 0;
222
223
0
  if (q->qe[pri])
224
0
    {
225
0
      q->qe[pri]->next = req;
226
0
      q->qe[pri] = req;
227
0
    }
228
0
  else
229
0
    q->qe[pri] = q->qs[pri] = req;
230
231
0
  return q->size++;
232
0
}
233
234
static ETP_REQ * ecb_noinline
235
reqq_shift (etp_reqq *q)
236
0
{
237
0
  int pri;
238
239
0
  if (!q->size)
240
0
    return 0;
241
242
0
  --q->size;
243
244
0
  for (pri = ETP_NUM_PRI; pri--; )
245
0
    {
246
0
      ETP_REQ *req = q->qs[pri];
247
248
0
      if (req)
249
0
        {
250
0
          if (!(q->qs[pri] = (ETP_REQ *)req->next))
251
0
            q->qe[pri] = 0;
252
253
0
          return req;
254
0
        }
255
0
    }
256
257
0
  abort ();
258
0
}
259
260
ETP_API_DECL int ecb_cold
261
etp_init (etp_pool pool)
262
0
{
263
0
  X_MUTEX_CREATE (pool->lock);
264
0
  X_COND_CREATE  (pool->reqwait);
265
0
  X_COND_CREATE  (pool->wrkwait);
266
267
0
  reqq_init (&pool->req_queue);
268
269
0
  pool->started  = 0;
270
0
  pool->idle     = 0;
271
0
  pool->wanted   = 4;
272
0
  pool->nreqs_run  = 0;
273
274
0
  pool->max_idle = 4;      /* maximum number of threads that can pool->idle indefinitely */
275
0
  pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */
276
277
0
  return 0;
278
0
}
279
280
ETP_API_DECL int ecb_cold
281
etp_user_init (etp_pool_user user, void *userdata, ETP_CB want_poll, ETP_CB done_poll)
282
0
{
283
0
  user->pool = NULL;
284
0
  X_MUTEX_CREATE (user->lock);
285
286
0
  reqq_init (&user->res_queue);
287
288
0
  user->max_poll_time = 0;
289
0
  user->max_poll_reqs = 0;
290
0
  user->nreqs    = 0;
291
292
0
  user->userdata     = userdata;
293
0
  user->want_poll_cb = want_poll;
294
0
  user->done_poll_cb = done_poll;
295
296
0
  return 0;
297
0
}
298
299
static void ecb_noinline ecb_cold
300
etp_proc_init (void)
301
0
{
302
0
#if HAVE_PRCTL_SET_NAME
303
  /* provide a more sensible "thread name" */
304
0
  char name[16 + 1];
305
0
  const int namelen = sizeof (name) - 1;
306
0
  int len;
307
308
0
  prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0);
309
0
  name [namelen] = 0;
310
0
  len = strlen (name);
311
0
  strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio");
312
0
  prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0);
313
0
#endif
314
0
}
315
316
X_THREAD_PROC (etp_proc)
317
0
{
318
0
  ETP_REQ *req;
319
0
  struct timespec ts;
320
0
  etp_pool pool = (etp_pool)thr_arg;
321
0
  etp_worker self = {};
322
0
  self.pool = pool;
323
0
  etp_pool_user user; /* per request */
324
325
0
  etp_proc_init ();
326
327
  /* try to distribute timeouts somewhat evenly (nanosecond part) */
328
0
  ts.tv_nsec = (unsigned long)random() * (1000000000UL / RAND_MAX);
329
330
0
  X_LOCK (pool->lock);
331
332
0
  if (pool->on_start_cb)
333
0
    if (pool->on_start_cb(pool->on_start_data))
334
0
      goto error;
335
336
0
  for (;;)
337
0
    {
338
0
      for (;;)
339
0
        {
340
0
          req = reqq_shift (&pool->req_queue);
341
342
0
          if (ecb_expect_true (req))
343
0
            break;
344
345
0
          if (pool->started > pool->wanted) /* someone is shrinking the pool */
346
0
            goto quit;
347
348
0
          ++pool->idle;
349
350
0
          if (pool->idle <= pool->max_idle)
351
0
            {
352
              /* we are allowed to pool->idle, so do so without any timeout */
353
0
              X_COND_WAIT (pool->reqwait, pool->lock);
354
0
              --pool->idle;
355
0
            }
356
0
          else
357
0
            {
358
0
              ts.tv_sec = time (0) + pool->idle_timeout;
359
360
0
              if (X_COND_TIMEDWAIT (pool->reqwait, pool->lock, ts) != ETIMEDOUT)
361
0
                continue;
362
363
0
              --pool->idle;
364
0
              goto quit;
365
0
            }
366
0
        }
367
368
0
      ++pool->nreqs_run;
369
370
0
      X_UNLOCK (pool->lock);
371
372
0
      user = req->pool_user;
373
0
      ETP_EXECUTE (&self, req);
374
375
0
      X_LOCK (user->lock);
376
377
0
      if (!reqq_push (&user->res_queue, req))
378
0
        ETP_WANT_POLL (user);
379
380
0
      X_UNLOCK (user->lock);
381
382
0
      X_LOCK (pool->lock);
383
0
      --pool->nreqs_run;
384
0
    }
385
386
0
quit:
387
0
  assert(pool->started > 0);
388
0
  pool->started--;
389
0
  X_COND_BROADCAST (pool->wrkwait);
390
0
  X_UNLOCK (pool->lock);
391
0
  if (pool->on_stop_cb)
392
0
    pool->on_stop_cb(pool->on_stop_data);
393
394
0
  return 0;
395
396
0
error:
397
0
  assert(pool->started > 0);
398
0
  pool->started--;
399
0
  X_COND_BROADCAST (pool->wrkwait);
400
0
  X_UNLOCK (pool->lock);
401
0
  return 0;
402
0
}
403
404
static void ecb_cold
405
etp_start_thread (etp_pool pool)
406
0
{
407
0
  xthread_t tid;
408
0
  int threads;
409
410
0
  if (xthread_create (&tid, etp_proc, (void *)pool) != 0)
411
0
    return;
412
413
0
  X_LOCK (pool->lock);
414
0
  assert(pool->started > 0);
415
0
  threads = --pool->started;
416
0
  X_COND_BROADCAST (pool->wrkwait);
417
0
  X_UNLOCK (pool->lock);
418
419
  /* Assume if at least one thread managed to start the queue will drain
420
   * eventually. If not, tasks will never complete; the best we can do
421
   * is to die now.
422
   */
423
0
  if (threads == 0)
424
0
    {
425
0
      fputs("failed to start thread in ETP pool", stderr);
426
0
      abort();
427
0
    }
428
0
}
429
430
ETP_API_DECL int
431
etp_poll (etp_pool_user user)
432
0
{
433
0
  unsigned int maxreqs;
434
0
  unsigned int maxtime;
435
0
  struct timeval tv_start, tv_now;
436
437
0
  maxreqs = user->max_poll_reqs;
438
0
  maxtime = user->max_poll_time;
439
440
0
  if (maxtime)
441
0
    gettimeofday (&tv_start, 0);
442
443
0
  for (;;)
444
0
    {
445
0
      ETP_REQ *req;
446
447
0
      X_LOCK (user->lock);
448
0
      req = reqq_shift (&user->res_queue);
449
450
0
      if (ecb_expect_true (req))
451
0
        {
452
0
          if (ecb_expect_true (user->nreqs))
453
0
            --user->nreqs;
454
455
0
          if (!user->res_queue.size)
456
0
            ETP_DONE_POLL (user);
457
0
        }
458
459
0
      X_UNLOCK (user->lock);
460
461
0
      if (ecb_expect_false (!req))
462
0
        return 0;
463
464
0
      if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size))
465
0
        {
466
0
          req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */
467
0
          continue;
468
0
        }
469
0
      else
470
0
        {
471
0
          int res = ETP_FINISH (req);
472
0
          if (ecb_expect_false (res))
473
0
            return res;
474
0
        }
475
476
0
      if (ecb_expect_false (maxreqs && !--maxreqs))
477
0
        break;
478
479
0
      if (maxtime)
480
0
        {
481
0
          gettimeofday (&tv_now, 0);
482
483
0
          if (etp_tvdiff (&tv_start, &tv_now) >= maxtime)
484
0
            break;
485
0
        }
486
0
    }
487
488
0
  errno = EAGAIN;
489
0
  return -1;
490
0
}
491
492
ETP_API_DECL void
493
etp_grp_cancel (etp_pool_user user, ETP_REQ *grp);
494
495
ETP_API_DECL void
496
etp_cancel (etp_pool_user user, ETP_REQ *req)
497
0
{
498
0
  req->cancelled = 1;
499
500
0
  etp_grp_cancel (user, req);
501
0
}
502
503
ETP_API_DECL void
504
etp_grp_cancel (etp_pool_user user, ETP_REQ *grp)
505
0
{
506
0
  for (grp = grp->grp_first; grp; grp = grp->grp_next)
507
0
    etp_cancel (user, grp);
508
0
}
509
510
ETP_API_DECL void
511
etp_submit (etp_pool_user user, ETP_REQ *req)
512
0
{
513
0
  req->pri -= ETP_PRI_MIN;
514
515
0
  if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
516
0
  if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
517
518
0
  user->nreqs++;
519
0
  if (ecb_expect_false (req->type == ETP_TYPE_GROUP))
520
0
    {
521
0
      X_LOCK (user->lock);
522
523
0
      if (!reqq_push (&user->res_queue, req))
524
0
        ETP_WANT_POLL (user);
525
526
0
      X_UNLOCK (user->lock);
527
0
    }
528
0
  else
529
0
    {
530
0
      etp_pool pool = user->pool;
531
0
      int need_thread = 0;
532
533
0
      X_LOCK (pool->lock);
534
0
      req->pool_user = user;
535
0
      reqq_push (&pool->req_queue, req);
536
0
      if (ecb_expect_false(pool->req_queue.size + pool->nreqs_run > pool->started &&
537
0
                           pool->started < pool->wanted))
538
0
        {
539
          /* arrange for a thread to start */
540
0
          need_thread = 1;
541
0
          pool->started++;
542
0
        }
543
0
      X_COND_SIGNAL (pool->reqwait);
544
0
      X_UNLOCK (pool->lock);
545
0
      if (ecb_expect_false(need_thread))
546
0
        etp_start_thread(pool);
547
0
    }
548
0
}
549
550
ETP_API_DECL void ecb_cold
551
etp_set_max_poll_time (etp_pool_user user, double seconds)
552
0
{
553
0
  user->max_poll_time = seconds * ETP_TICKS;
554
0
}
555
556
ETP_API_DECL void ecb_cold
557
etp_set_max_poll_reqs (etp_pool_user user, unsigned int maxreqs)
558
0
{
559
0
  user->max_poll_reqs = maxreqs;
560
0
}
561
562
ETP_API_DECL void ecb_cold
563
etp_set_thread_on_start(etp_pool pool, int (*on_start_cb)(void *), void *data)
564
0
{
565
0
  pool->on_start_cb = on_start_cb;
566
0
  pool->on_start_data = data;
567
0
}
568
569
ETP_API_DECL void ecb_cold
570
etp_set_thread_on_stop(etp_pool pool, int (*on_stop_cb)(void *), void *data)
571
0
{
572
0
  pool->on_stop_cb = on_stop_cb;
573
0
  pool->on_stop_data = data;
574
0
}
575
576
577
ETP_API_DECL void ecb_cold
578
etp_set_max_idle (etp_pool pool, unsigned int threads)
579
0
{
580
0
  X_LOCK   (pool->lock);
581
0
  pool->max_idle = threads;
582
0
  X_UNLOCK (pool->lock);
583
0
}
584
585
ETP_API_DECL void ecb_cold
586
etp_set_idle_timeout (etp_pool pool, unsigned int seconds)
587
0
{
588
0
  X_LOCK   (pool->lock);
589
0
  pool->idle_timeout = seconds;
590
0
  X_UNLOCK (pool->lock);
591
0
}
592
593
ETP_API_DECL void ecb_cold
594
etp_set_min_parallel (etp_pool pool, unsigned int threads)
595
0
{
596
0
  X_LOCK   (pool->lock);
597
0
  if (pool->wanted < threads)
598
0
    pool->wanted = threads;
599
0
  X_UNLOCK (pool->lock);
600
0
}
601
602
ETP_API_DECL int ecb_cold
603
etp_set_max_parallel (etp_pool pool, unsigned int threads)
604
0
{
605
0
  int retval;
606
0
  X_LOCK   (pool->lock);
607
0
  retval = pool->wanted;
608
0
  if (pool->wanted > threads)
609
0
    pool->wanted = threads;
610
611
0
  while (pool->started > pool->wanted)
612
0
    {
613
0
      X_COND_BROADCAST(pool->reqwait); /* wake idle threads */
614
0
      X_COND_WAIT(pool->wrkwait, pool->lock);
615
0
    }
616
0
  X_UNLOCK (pool->lock);
617
0
  return retval;
618
0
}