Coverage Report

Created: 2024-02-29 06:05

/src/strongswan/src/libstrongswan/networking/streams/stream_service.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2013 Martin Willi
3
 *
4
 * Copyright (C) secunet Security Networks AG
5
 *
6
 * This program is free software; you can redistribute it and/or modify it
7
 * under the terms of the GNU General Public License as published by the
8
 * Free Software Foundation; either version 2 of the License, or (at your
9
 * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
10
 *
11
 * This program is distributed in the hope that it will be useful, but
12
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14
 * for more details.
15
 */
16
17
#include <library.h>
18
#include <threading/thread.h>
19
#include <threading/mutex.h>
20
#include <threading/condvar.h>
21
#include <processing/jobs/callback_job.h>
22
23
#include "stream_service.h"
24
25
#include <errno.h>
26
#include <unistd.h>
27
#include <sys/stat.h>
28
29
typedef struct private_stream_service_t private_stream_service_t;
30
31
/**
32
 * Private data of an stream_service_t object.
33
 */
34
struct private_stream_service_t {
35
36
  /**
37
   * Public stream_service_t interface.
38
   */
39
  stream_service_t public;
40
41
  /**
42
   * Underlying socket
43
   */
44
  int fd;
45
46
  /**
47
   * Accept callback
48
   */
49
  stream_service_cb_t cb;
50
51
  /**
52
   * Accept callback data
53
   */
54
  void *data;
55
56
  /**
57
   * Job priority to invoke callback with
58
   */
59
  job_priority_t prio;
60
61
  /**
62
   * Maximum number of parallel callback invocations
63
   */
64
  u_int cncrncy;
65
66
  /**
67
   * Currently active jobs
68
   */
69
  u_int active;
70
71
  /**
72
   * Currently running jobs
73
   */
74
  u_int running;
75
76
  /**
77
   * mutex to lock active counter
78
   */
79
  mutex_t *mutex;
80
81
  /**
82
   * Condvar to wait for callback termination
83
   */
84
  condvar_t *condvar;
85
86
  /**
87
   * TRUE when the service is terminated
88
   */
89
  bool terminated;
90
91
  /**
92
   * Reference counter
93
   */
94
  refcount_t ref;
95
};
96
97
static void destroy_service(private_stream_service_t *this)
98
0
{
99
0
  if (ref_put(&this->ref))
100
0
  {
101
0
    close(this->fd);
102
0
    this->mutex->destroy(this->mutex);
103
0
    this->condvar->destroy(this->condvar);
104
0
    free(this);
105
0
  }
106
0
}
107
108
/**
109
 * Data to pass to async accept job
110
 */
111
typedef struct {
112
  /** callback function */
113
  stream_service_cb_t cb;
114
  /** callback data */
115
  void *data;
116
  /** accepted connection */
117
  int fd;
118
  /** reference to stream service */
119
  private_stream_service_t *this;
120
} async_data_t;
121
122
/**
123
 * Forward declaration
124
 */
125
static bool watch(private_stream_service_t *this, int fd, watcher_event_t event);
126
127
/**
128
 * Clean up accept data
129
 */
130
static void destroy_async_data(async_data_t *data)
131
0
{
132
0
  private_stream_service_t *this = data->this;
133
134
0
  this->mutex->lock(this->mutex);
135
0
  if (this->active-- == this->cncrncy && !this->terminated)
136
0
  {
137
    /* leaving concurrency limit, restart accept()ing. */
138
0
    lib->watcher->add(lib->watcher, this->fd,
139
0
              WATCHER_READ, (watcher_cb_t)watch, this);
140
0
  }
141
0
  this->condvar->signal(this->condvar);
142
0
  this->mutex->unlock(this->mutex);
143
0
  destroy_service(this);
144
145
0
  if (data->fd != -1)
146
0
  {
147
0
    close(data->fd);
148
0
  }
149
0
  free(data);
150
0
}
151
152
/**
153
 * Reduce running counter
154
 */
155
CALLBACK(reduce_running, void,
156
  async_data_t *data)
157
0
{
158
0
  private_stream_service_t *this = data->this;
159
160
0
  this->mutex->lock(this->mutex);
161
0
  this->running--;
162
0
  this->condvar->signal(this->condvar);
163
0
  this->mutex->unlock(this->mutex);
164
0
}
165
166
/**
167
 * Async processing of accepted connection
168
 */
169
static job_requeue_t accept_async(async_data_t *data)
170
0
{
171
0
  private_stream_service_t *this = data->this;
172
0
  stream_t *stream;
173
174
0
  this->mutex->lock(this->mutex);
175
0
  if (this->terminated)
176
0
  {
177
0
    this->mutex->unlock(this->mutex);
178
0
    return JOB_REQUEUE_NONE;
179
0
  }
180
0
  this->running++;
181
0
  this->mutex->unlock(this->mutex);
182
183
0
  stream = stream_create_from_fd(data->fd);
184
0
  if (stream)
185
0
  {
186
    /* FD is now owned by stream, don't close it during cleanup */
187
0
    data->fd = -1;
188
0
    thread_cleanup_push(reduce_running, data);
189
0
    thread_cleanup_push((void*)stream->destroy, stream);
190
0
    thread_cleanup_pop(!data->cb(data->data, stream));
191
0
    thread_cleanup_pop(TRUE);
192
0
  }
193
0
  return JOB_REQUEUE_NONE;
194
0
}
195
196
/**
197
 * Watcher callback function
198
 */
199
static bool watch(private_stream_service_t *this, int fd, watcher_event_t event)
200
0
{
201
0
  async_data_t *data;
202
0
  bool keep = TRUE;
203
204
0
  INIT(data,
205
0
    .cb = this->cb,
206
0
    .data = this->data,
207
0
    .fd = accept(fd, NULL, NULL),
208
0
    .this = this,
209
0
  );
210
211
0
  if (data->fd != -1 && !this->terminated)
212
0
  {
213
0
    this->mutex->lock(this->mutex);
214
0
    if (++this->active == this->cncrncy)
215
0
    {
216
      /* concurrency limit reached, stop accept()ing new connections */
217
0
      keep = FALSE;
218
0
    }
219
0
    this->mutex->unlock(this->mutex);
220
0
    ref_get(&this->ref);
221
222
0
    lib->processor->queue_job(lib->processor,
223
0
      (job_t*)callback_job_create_with_prio((void*)accept_async, data,
224
0
        (void*)destroy_async_data, (callback_job_cancel_t)return_false,
225
0
        this->prio));
226
0
  }
227
0
  else
228
0
  {
229
0
    free(data);
230
0
  }
231
0
  return keep;
232
0
}
233
234
METHOD(stream_service_t, on_accept, void,
235
  private_stream_service_t *this, stream_service_cb_t cb, void *data,
236
  job_priority_t prio, u_int cncrncy)
237
0
{
238
0
  this->mutex->lock(this->mutex);
239
240
0
  if (this->terminated)
241
0
  {
242
0
    this->mutex->unlock(this->mutex);
243
0
    return;
244
0
  }
245
246
  /* wait for all callbacks to return */
247
0
  while (this->active)
248
0
  {
249
0
    this->condvar->wait(this->condvar, this->mutex);
250
0
  }
251
252
0
  if (this->cb)
253
0
  {
254
0
    lib->watcher->remove(lib->watcher, this->fd);
255
0
  }
256
257
0
  this->cb = cb;
258
0
  this->data = data;
259
0
  if (prio <= JOB_PRIO_MAX)
260
0
  {
261
0
    this->prio = prio;
262
0
  }
263
0
  this->cncrncy = cncrncy;
264
265
0
  if (this->cb)
266
0
  {
267
0
    lib->watcher->add(lib->watcher, this->fd,
268
0
              WATCHER_READ, (watcher_cb_t)watch, this);
269
0
  }
270
271
0
  this->mutex->unlock(this->mutex);
272
0
}
273
274
METHOD(stream_service_t, destroy, void,
275
  private_stream_service_t *this)
276
0
{
277
0
  this->mutex->lock(this->mutex);
278
0
  lib->watcher->remove(lib->watcher, this->fd);
279
0
  this->terminated = TRUE;
280
0
  while (this->running)
281
0
  {
282
0
    this->condvar->wait(this->condvar, this->mutex);
283
0
  }
284
0
  this->mutex->unlock(this->mutex);
285
0
  destroy_service(this);
286
0
}
287
288
/**
289
 * See header
290
 */
291
stream_service_t *stream_service_create_from_fd(int fd)
292
0
{
293
0
  private_stream_service_t *this;
294
295
0
  INIT(this,
296
0
    .public = {
297
0
      .on_accept = _on_accept,
298
0
      .destroy = _destroy,
299
0
    },
300
0
    .fd = fd,
301
0
    .prio = JOB_PRIO_MEDIUM,
302
0
    .mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
303
0
    .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
304
0
    .ref = 1,
305
0
  );
306
307
0
  return &this->public;
308
0
}