Coverage Report

Created: 2024-02-29 06:05

/src/strongswan/src/libstrongswan/processing/watcher.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2016-2023 Tobias Brunner
3
 * Copyright (C) 2013 Martin Willi
4
 *
5
 * Copyright (C) secunet Security Networks AG
6
 *
7
 * This program is free software; you can redistribute it and/or modify it
8
 * under the terms of the GNU General Public License as published by the
9
 * Free Software Foundation; either version 2 of the License, or (at your
10
 * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
11
 *
12
 * This program is distributed in the hope that it will be useful, but
13
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
14
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15
 * for more details.
16
 */
17
18
#include "watcher.h"
19
20
#include <library.h>
21
#include <threading/thread.h>
22
#include <threading/mutex.h>
23
#include <threading/condvar.h>
24
#include <collections/linked_list.h>
25
#include <processing/jobs/callback_job.h>
26
27
#include <unistd.h>
28
#include <errno.h>
29
#include <fcntl.h>
30
31
typedef struct private_watcher_t private_watcher_t;
32
typedef struct entry_t entry_t;
33
34
/**
35
 * Private data of an watcher_t object.
36
 */
37
struct private_watcher_t {
38
39
  /**
40
   * Public watcher_t interface.
41
   */
42
  watcher_t public;
43
44
  /**
45
   * List of registered FDs
46
   */
47
  entry_t *fds;
48
49
  /**
50
   * Last registered FD
51
   */
52
  entry_t *last;
53
54
  /**
55
   * Number of registered FDs
56
   */
57
  u_int count;
58
59
  /**
60
   * Pending update of FD list?
61
   */
62
  bool pending;
63
64
  /**
65
   * Running state of watcher
66
   */
67
  watcher_state_t state;
68
69
  /**
70
   * Lock to access FD list
71
   */
72
  mutex_t *mutex;
73
74
  /**
75
   * Condvar to signal completion of callback
76
   */
77
  condvar_t *condvar;
78
79
  /**
80
   * Notification pipe to signal watcher thread
81
   */
82
  int notify[2];
83
84
  /**
85
   * List of callback jobs to process by watcher thread, as job_t
86
   */
87
  linked_list_t *jobs;
88
};
89
90
/**
91
 * Entry for a registered file descriptor
92
 */
93
struct entry_t {
94
  /** file descriptor */
95
  int fd;
96
  /** events to watch */
97
  watcher_event_t events;
98
  /** registered callback function */
99
  watcher_cb_t cb;
100
  /** user data to pass to callback */
101
  void *data;
102
  /** callback(s) currently active? */
103
  int in_callback;
104
  /** next registered fd */
105
  entry_t *next;
106
};
107
108
/**
109
 * Adds the given entry at the end of the list
110
 */
111
static void add_entry(private_watcher_t *this, entry_t *entry)
112
0
{
113
0
  if (this->last)
114
0
  {
115
0
    this->last->next = entry;
116
0
    this->last = entry;
117
0
  }
118
0
  else
119
0
  {
120
0
    this->fds = this->last = entry;
121
0
  }
122
0
  this->count++;
123
0
}
124
125
/**
126
 * Removes and frees the given entry
127
 *
128
 * Updates the previous entry and returns the next entry in the list, if any.
129
 */
130
static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
131
               entry_t *prev)
132
0
{
133
0
  entry_t *next = entry->next;
134
135
0
  if (prev)
136
0
  {
137
0
    prev->next = next;
138
0
  }
139
0
  else
140
0
  {
141
0
    this->fds = next;
142
0
  }
143
0
  if (this->last == entry)
144
0
  {
145
0
    this->last = prev;
146
0
  }
147
0
  this->count--;
148
0
  free(entry);
149
0
  return next;
150
0
}
151
152
/**
153
 * Data we pass on for an async notification
154
 */
155
typedef struct {
156
  /** triggering entry */
157
  entry_t *entry;
158
  /** file descriptor */
159
  int fd;
160
  /** event type */
161
  watcher_event_t event;
162
  /** registered callback function */
163
  watcher_cb_t cb;
164
  /** user data to pass to callback */
165
  void *data;
166
  /** keep registered? */
167
  bool keep;
168
  /** reference to watcher */
169
  private_watcher_t *this;
170
} notify_data_t;
171
172
/**
173
 * Notify watcher thread about changes and unlock mutex
174
 */
175
static void update_and_unlock(private_watcher_t *this)
176
0
{
177
0
  char buf[1] = { 'u' };
178
0
  int error = 0;
179
180
0
  this->pending = TRUE;
181
0
  if (this->notify[1] != -1)
182
0
  {
183
0
    if (write(this->notify[1], buf, sizeof(buf)) == -1)
184
0
    {
185
0
      error = errno;
186
0
    }
187
0
  }
188
0
  this->mutex->unlock(this->mutex);
189
190
0
  if (error)
191
0
  {
192
0
    DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(error));
193
0
  }
194
0
}
195
196
/**
197
 * Cleanup function if callback gets canceled
198
 */
199
static void unregister(notify_data_t *data)
200
0
{
201
  /* if a thread processing a callback gets canceled, we mark the entry
202
   * as canceled, like the callback would return FALSE. This is required
203
   * to not queue this watcher again if all threads have been gone. */
204
0
  data->keep = FALSE;
205
0
}
206
207
 /**
208
 * Execute callback of registered FD, asynchronous
209
 */
210
static job_requeue_t notify_async(notify_data_t *data)
211
0
{
212
0
  thread_cleanup_push((void*)unregister, data);
213
0
  data->keep = data->cb(data->data, data->fd, data->event);
214
0
  thread_cleanup_pop(FALSE);
215
0
  return JOB_REQUEUE_NONE;
216
0
}
217
218
/**
219
 * Clean up notification data, reactivate FD
220
 */
221
static void notify_end(notify_data_t *data)
222
0
{
223
0
  private_watcher_t *this = data->this;
224
0
  entry_t *entry, *prev = NULL;
225
0
  watcher_event_t updated = 0;
226
0
  bool removed = FALSE;
227
228
  /* reactivate the disabled entry */
229
0
  this->mutex->lock(this->mutex);
230
0
  for (entry = this->fds; entry; prev = entry, entry = entry->next)
231
0
  {
232
0
    if (entry == data->entry)
233
0
    {
234
0
      if (!data->keep)
235
0
      {
236
0
        entry->events &= ~data->event;
237
0
        updated = entry->events;
238
0
        if (!entry->events)
239
0
        {
240
0
          remove_entry(this, entry, prev);
241
0
          removed = TRUE;
242
0
          break;
243
0
        }
244
0
      }
245
0
      entry->in_callback--;
246
0
      break;
247
0
    }
248
0
  }
249
0
  this->condvar->broadcast(this->condvar);
250
0
  update_and_unlock(this);
251
252
0
  if (removed)
253
0
  {
254
0
    DBG3(DBG_JOB, "removed fd %d[%s%s] from watcher after callback", data->fd,
255
0
       data->event & WATCHER_READ ? "r" : "",
256
0
       data->event & WATCHER_WRITE ? "w" : "");
257
0
  }
258
0
  else if (updated)
259
0
  {
260
0
    DBG3(DBG_JOB, "updated fd %d[%s%s] to %d[%s%s] after callback", data->fd,
261
0
       (updated | data->event) & WATCHER_READ ? "r" : "",
262
0
       (updated | data->event) & WATCHER_WRITE ? "w" : "", data->fd,
263
0
       updated & WATCHER_READ ? "r" : "",
264
0
       updated & WATCHER_WRITE ? "w" : "");
265
0
  }
266
0
  free(data);
267
0
}
268
269
/**
270
 * Execute the callback for a registered FD
271
 */
272
static void notify(private_watcher_t *this, entry_t *entry,
273
           watcher_event_t event)
274
0
{
275
0
  notify_data_t *data;
276
277
  /* get a copy of entry for async job, but with specific event */
278
0
  INIT(data,
279
0
    .entry = entry,
280
0
    .fd = entry->fd,
281
0
    .event = event,
282
0
    .cb = entry->cb,
283
0
    .data = entry->data,
284
0
    .keep = TRUE,
285
0
    .this = this,
286
0
  );
287
288
  /* deactivate entry, so we can poll() other FDs even if the async
289
   * processing did not handle the event yet */
290
0
  entry->in_callback++;
291
292
0
  this->jobs->insert_last(this->jobs,
293
0
          callback_job_create_with_prio((void*)notify_async, data,
294
0
            (void*)notify_end, (callback_job_cancel_t)return_false,
295
0
            JOB_PRIO_CRITICAL));
296
0
}
297
298
/**
299
 * Thread cancellation function for watcher thread
300
 */
301
static void activate_all(private_watcher_t *this)
302
0
{
303
0
  entry_t *entry;
304
305
  /* When the watcher thread gets canceled, we have to reactivate any entry
306
   * and signal threads in remove() to go on. */
307
308
0
  this->mutex->lock(this->mutex);
309
0
  for (entry = this->fds; entry; entry = entry->next)
310
0
  {
311
0
    entry->in_callback = 0;
312
0
  }
313
0
  this->state = WATCHER_STOPPED;
314
0
  this->condvar->broadcast(this->condvar);
315
0
  this->mutex->unlock(this->mutex);
316
0
}
317
318
/**
319
 * Find flagged revents in a pollfd set by fd
320
 */
321
static inline int find_revents(struct pollfd *pfd, int count, int fd)
322
0
{
323
0
  int i;
324
325
0
  for (i = 0; i < count; i++)
326
0
  {
327
0
    if (pfd[i].fd == fd)
328
0
    {
329
0
      return pfd[i].revents;
330
0
    }
331
0
  }
332
0
  return 0;
333
0
}
334
335
#if DEBUG_LEVEL >= 2
336
0
#define reset_log(buf, pos, len) ({ buf[0] = '\0'; pos = buf; len = sizeof(buf); })
337
0
#define reset_event_log(buf, pos) ({ pos = buf; })
338
0
#define end_event_log(pos) ({ *pos = '\0'; })
339
0
#define log_event(pos, ev) ({ *pos++ = ev; })
340
0
#define log_fd(pos, len, fd, ev) ({ \
341
0
  if (ev[0]) \
342
0
  { \
343
0
    int _add = snprintf(pos, len, " %d[%s]", fd, ev); \
344
0
    if (_add >= 0 && _add < len) \
345
0
    { \
346
0
      pos += _add; \
347
0
      len -= _add; \
348
0
    } \
349
0
  } \
350
0
})
351
#else
352
#define reset_event_log(...) ({})
353
#define end_event_log(...) ({})
354
#define log_event(...) ({})
355
#define reset_log(...) ({})
356
#define log_fd(...) ({})
357
#endif
358
359
/**
360
 * Dispatching function
361
 */
362
static job_requeue_t watch(private_watcher_t *this)
363
0
{
364
0
  entry_t *entry;
365
0
  struct pollfd *pfd;
366
0
  int count = 0, res;
367
0
#if DEBUG_LEVEL >= 2
368
0
  char logbuf[BUF_LEN], *logpos, eventbuf[4], *eventpos;
369
0
  int loglen;
370
0
#endif
371
372
0
  reset_log(logbuf, logpos, loglen);
373
374
0
  this->mutex->lock(this->mutex);
375
376
0
  count = this->count;
377
0
  if (!count)
378
0
  {
379
0
    this->state = WATCHER_STOPPED;
380
0
    this->mutex->unlock(this->mutex);
381
0
    return JOB_REQUEUE_NONE;
382
0
  }
383
0
  if (this->state == WATCHER_QUEUED)
384
0
  {
385
0
    this->state = WATCHER_RUNNING;
386
0
  }
387
388
0
  pfd = alloca(sizeof(*pfd) * (count + 1));
389
0
  pfd[0].fd = this->notify[0];
390
0
  pfd[0].events = POLLIN;
391
0
  count = 1;
392
393
0
  for (entry = this->fds; entry; entry = entry->next)
394
0
  {
395
0
    if (!entry->in_callback)
396
0
    {
397
0
      pfd[count].fd = entry->fd;
398
0
      pfd[count].events = 0;
399
0
      reset_event_log(eventbuf, eventpos);
400
0
      if (entry->events & WATCHER_READ)
401
0
      {
402
0
        log_event(eventpos, 'r');
403
0
        pfd[count].events |= POLLIN;
404
0
      }
405
0
      if (entry->events & WATCHER_WRITE)
406
0
      {
407
0
        log_event(eventpos, 'w');
408
0
        pfd[count].events |= POLLOUT;
409
0
      }
410
0
      end_event_log(eventpos);
411
0
      log_fd(logpos, loglen, entry->fd, eventbuf);
412
0
      count++;
413
0
    }
414
0
  }
415
0
  this->mutex->unlock(this->mutex);
416
417
0
#if DEBUG_LEVEL >= 3
418
0
  if (logbuf[0])
419
0
  {
420
0
    DBG3(DBG_JOB, "observing fds:%s", logbuf);
421
0
  }
422
0
#endif
423
424
0
  while (TRUE)
425
0
  {
426
0
    int revents;
427
0
    char buf[1];
428
0
    bool old;
429
0
    ssize_t len;
430
0
    job_t *job;
431
432
0
    DBG2(DBG_JOB, "watcher is observing %d fds", count-1);
433
0
    thread_cleanup_push((void*)activate_all, this);
434
0
    old = thread_cancelability(TRUE);
435
436
0
    res = poll(pfd, count, -1);
437
0
    if (res == -1 && errno == EINTR)
438
0
    {
439
      /* LinuxThreads interrupts poll(), but does not make it a
440
       * cancellation point. Manually test if we got canceled. */
441
0
      thread_cancellation_point();
442
0
    }
443
444
0
    thread_cancelability(old);
445
0
    thread_cleanup_pop(FALSE);
446
447
0
    if (res > 0)
448
0
    {
449
0
      if (pfd[0].revents & POLLIN)
450
0
      {
451
0
        while (TRUE)
452
0
        {
453
0
          len = read(this->notify[0], buf, sizeof(buf));
454
0
          if (len == -1)
455
0
          {
456
0
            if (errno != EAGAIN && errno != EWOULDBLOCK)
457
0
            {
458
0
              DBG1(DBG_JOB, "reading watcher notify failed: %s",
459
0
                 strerror(errno));
460
0
            }
461
0
            break;
462
0
          }
463
0
        }
464
0
        this->pending = FALSE;
465
0
        DBG2(DBG_JOB, "watcher got notification, rebuilding");
466
0
        break;
467
0
      }
468
469
0
      reset_log(logbuf, logpos, loglen);
470
0
      this->mutex->lock(this->mutex);
471
0
      for (entry = this->fds; entry; entry = entry->next)
472
0
      {
473
0
        if (entry->in_callback)
474
0
        {
475
0
          continue;
476
0
        }
477
0
        reset_event_log(eventbuf, eventpos);
478
0
        revents = find_revents(pfd, count, entry->fd);
479
0
        if (revents & POLLERR)
480
0
        {
481
0
          log_event(eventpos, 'e');
482
0
        }
483
0
        if (revents & POLLIN)
484
0
        {
485
0
          log_event(eventpos, 'r');
486
0
        }
487
0
        if (revents & POLLOUT)
488
0
        {
489
0
          log_event(eventpos, 'w');
490
0
        }
491
0
        if (entry->events & WATCHER_READ &&
492
0
          revents & (POLLIN | POLLERR | POLLHUP | POLLNVAL))
493
0
        {
494
0
          notify(this, entry, WATCHER_READ);
495
0
        }
496
0
        if (entry->events & WATCHER_WRITE &&
497
0
          revents & (POLLOUT | POLLERR | POLLHUP | POLLNVAL))
498
0
        {
499
0
          notify(this, entry, WATCHER_WRITE);
500
0
        }
501
0
        end_event_log(eventpos);
502
0
        log_fd(logpos, loglen, entry->fd, eventbuf);
503
0
      }
504
0
      this->mutex->unlock(this->mutex);
505
506
0
#if DEBUG_LEVEL >= 2
507
0
      if (logbuf[0])
508
0
      {
509
0
        DBG2(DBG_JOB, "events on fds:%s", logbuf);
510
0
      }
511
0
#endif
512
513
0
      if (this->jobs->get_count(this->jobs))
514
0
      {
515
0
        while (this->jobs->remove_first(this->jobs,
516
0
                        (void**)&job) == SUCCESS)
517
0
        {
518
0
          lib->processor->execute_job(lib->processor, job);
519
0
        }
520
        /* we temporarily disable a notified FD, rebuild FDSET */
521
0
        break;
522
0
      }
523
0
    }
524
0
    else
525
0
    {
526
0
      if (!this->pending && errno != EINTR)
527
0
      { /* complain only if no pending updates */
528
0
        DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
529
0
      }
530
0
      break;
531
0
    }
532
0
  }
533
0
  return JOB_REQUEUE_DIRECT;
534
0
}
535
536
METHOD(watcher_t, add, void,
537
  private_watcher_t *this, int fd, watcher_event_t events,
538
  watcher_cb_t cb, void *data)
539
0
{
540
0
  entry_t *entry;
541
542
0
  INIT(entry,
543
0
    .fd = fd,
544
0
    .events = events,
545
0
    .cb = cb,
546
0
    .data = data,
547
0
  );
548
549
0
  DBG3(DBG_JOB, "adding fd %d[%s%s] to watcher", fd,
550
0
     events & WATCHER_READ ? "r" : "",
551
0
     events & WATCHER_WRITE ? "w" : "");
552
553
0
  this->mutex->lock(this->mutex);
554
0
  add_entry(this, entry);
555
0
  if (this->state == WATCHER_STOPPED)
556
0
  {
557
0
    this->state = WATCHER_QUEUED;
558
0
    this->mutex->unlock(this->mutex);
559
560
0
    lib->processor->queue_job(lib->processor,
561
0
      (job_t*)callback_job_create_with_prio((void*)watch, this,
562
0
        NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
563
0
  }
564
0
  else
565
0
  {
566
0
    update_and_unlock(this);
567
0
  }
568
0
}
569
570
METHOD(watcher_t, remove_, void,
571
  private_watcher_t *this, int fd)
572
0
{
573
0
  entry_t *entry, *prev = NULL;
574
0
  watcher_event_t found = 0;
575
576
0
  this->mutex->lock(this->mutex);
577
0
  while (TRUE)
578
0
  {
579
0
    bool is_in_callback = FALSE;
580
581
0
    entry = this->fds;
582
0
    while (entry)
583
0
    {
584
0
      if (entry->fd == fd)
585
0
      {
586
0
        if (this->state != WATCHER_STOPPED && entry->in_callback)
587
0
        {
588
0
          is_in_callback = TRUE;
589
0
          break;
590
0
        }
591
0
        found |= entry->events;
592
0
        entry = remove_entry(this, entry, prev);
593
0
        continue;
594
0
      }
595
0
      prev = entry;
596
0
      entry = entry->next;
597
0
    }
598
0
    if (!is_in_callback)
599
0
    {
600
0
      break;
601
0
    }
602
0
    this->condvar->wait(this->condvar, this->mutex);
603
0
  }
604
0
  if (found)
605
0
  {
606
0
    update_and_unlock(this);
607
608
0
    DBG3(DBG_JOB, "removed fd %d[%s%s] from watcher", fd,
609
0
       found & WATCHER_READ ? "r" : "",
610
0
       found & WATCHER_WRITE ? "w" : "");
611
0
  }
612
0
  else
613
0
  {
614
0
    this->mutex->unlock(this->mutex);
615
0
  }
616
0
}
617
618
METHOD(watcher_t, get_state, watcher_state_t,
619
  private_watcher_t *this)
620
0
{
621
0
  watcher_state_t state;
622
623
0
  this->mutex->lock(this->mutex);
624
0
  state = this->state;
625
0
  this->mutex->unlock(this->mutex);
626
627
0
  return state;
628
0
}
629
630
METHOD(watcher_t, destroy, void,
631
  private_watcher_t *this)
632
8.39k
{
633
8.39k
  this->mutex->destroy(this->mutex);
634
8.39k
  this->condvar->destroy(this->condvar);
635
8.39k
  if (this->notify[0] != -1)
636
8.39k
  {
637
8.39k
    close(this->notify[0]);
638
8.39k
  }
639
8.39k
  if (this->notify[1] != -1)
640
8.39k
  {
641
8.39k
    close(this->notify[1]);
642
8.39k
  }
643
8.39k
  this->jobs->destroy(this->jobs);
644
8.39k
  free(this);
645
8.39k
}
646
647
#ifdef WIN32
648
649
/**
650
 * Create notify pipe with a TCP socketpair
651
 */
652
static bool create_notify(private_watcher_t *this)
653
{
654
  u_long on = 1;
655
656
  if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
657
  {
658
    /* use non-blocking I/O on read-end of notify pipe */
659
    if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
660
    {
661
      return TRUE;
662
    }
663
    DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
664
       "failed: %s", strerror(errno));
665
  }
666
  return FALSE;
667
}
668
669
#else /* !WIN32 */
670
671
/**
672
 * Create a notify pipe with a one-directional pipe
673
 */
674
static bool create_notify(private_watcher_t *this)
675
8.39k
{
676
8.39k
  int flags;
677
678
8.39k
  if (pipe(this->notify) == 0)
679
8.39k
  {
680
    /* use non-blocking I/O on read-end of notify pipe */
681
8.39k
    flags = fcntl(this->notify[0], F_GETFL);
682
8.39k
    if (flags != -1 &&
683
8.39k
      fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
684
8.39k
    {
685
8.39k
      return TRUE;
686
8.39k
    }
687
0
    DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
688
0
       "failed: %s", strerror(errno));
689
0
  }
690
0
  return FALSE;
691
8.39k
}
692
693
#endif /* !WIN32 */
694
695
/**
696
 * See header
697
 */
698
watcher_t *watcher_create()
699
8.39k
{
700
8.39k
  private_watcher_t *this;
701
702
8.39k
  INIT(this,
703
8.39k
    .public = {
704
8.39k
      .add = _add,
705
8.39k
      .remove = _remove_,
706
8.39k
      .get_state = _get_state,
707
8.39k
      .destroy = _destroy,
708
8.39k
    },
709
8.39k
    .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
710
8.39k
    .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
711
8.39k
    .jobs = linked_list_create(),
712
8.39k
    .notify = {-1, -1},
713
8.39k
    .state = WATCHER_STOPPED,
714
8.39k
  );
715
716
8.39k
  if (!create_notify(this))
717
0
  {
718
0
    DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
719
0
       strerror(errno));
720
0
  }
721
8.39k
  return &this->public;
722
8.39k
}