/src/strongswan/src/libstrongswan/networking/streams/stream_service.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (C) 2013 Martin Willi |
3 | | * |
4 | | * Copyright (C) secunet Security Networks AG |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or modify it |
7 | | * under the terms of the GNU General Public License as published by the |
8 | | * Free Software Foundation; either version 2 of the License, or (at your |
9 | | * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. |
10 | | * |
11 | | * This program is distributed in the hope that it will be useful, but |
12 | | * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY |
13 | | * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
14 | | * for more details. |
15 | | */ |
16 | | |
17 | | #include <library.h> |
18 | | #include <threading/thread.h> |
19 | | #include <threading/mutex.h> |
20 | | #include <threading/condvar.h> |
21 | | #include <processing/jobs/callback_job.h> |
22 | | |
23 | | #include "stream_service.h" |
24 | | |
25 | | #include <errno.h> |
26 | | #include <unistd.h> |
27 | | #include <sys/stat.h> |
28 | | |
29 | | typedef struct private_stream_service_t private_stream_service_t; |
30 | | |
31 | | /** |
32 | | * Private data of an stream_service_t object. |
33 | | */ |
34 | | struct private_stream_service_t { |
35 | | |
36 | | /** |
37 | | * Public stream_service_t interface. |
38 | | */ |
39 | | stream_service_t public; |
40 | | |
41 | | /** |
42 | | * Underlying socket |
43 | | */ |
44 | | int fd; |
45 | | |
46 | | /** |
47 | | * Accept callback |
48 | | */ |
49 | | stream_service_cb_t cb; |
50 | | |
51 | | /** |
52 | | * Accept callback data |
53 | | */ |
54 | | void *data; |
55 | | |
56 | | /** |
57 | | * Job priority to invoke callback with |
58 | | */ |
59 | | job_priority_t prio; |
60 | | |
61 | | /** |
62 | | * Maximum number of parallel callback invocations |
63 | | */ |
64 | | u_int cncrncy; |
65 | | |
66 | | /** |
67 | | * Currently active jobs |
68 | | */ |
69 | | u_int active; |
70 | | |
71 | | /** |
72 | | * Currently running jobs |
73 | | */ |
74 | | u_int running; |
75 | | |
76 | | /** |
77 | | * mutex to lock active counter |
78 | | */ |
79 | | mutex_t *mutex; |
80 | | |
81 | | /** |
82 | | * Condvar to wait for callback termination |
83 | | */ |
84 | | condvar_t *condvar; |
85 | | |
86 | | /** |
87 | | * TRUE when the service is terminated |
88 | | */ |
89 | | bool terminated; |
90 | | |
91 | | /** |
92 | | * Reference counter |
93 | | */ |
94 | | refcount_t ref; |
95 | | }; |
96 | | |
97 | | static void destroy_service(private_stream_service_t *this) |
98 | 0 | { |
99 | 0 | if (ref_put(&this->ref)) |
100 | 0 | { |
101 | 0 | close(this->fd); |
102 | 0 | this->mutex->destroy(this->mutex); |
103 | 0 | this->condvar->destroy(this->condvar); |
104 | 0 | free(this); |
105 | 0 | } |
106 | 0 | } |
107 | | |
108 | | /** |
109 | | * Data to pass to async accept job |
110 | | */ |
111 | | typedef struct { |
112 | | /** callback function */ |
113 | | stream_service_cb_t cb; |
114 | | /** callback data */ |
115 | | void *data; |
116 | | /** accepted connection */ |
117 | | int fd; |
118 | | /** reference to stream service */ |
119 | | private_stream_service_t *this; |
120 | | } async_data_t; |
121 | | |
122 | | /** |
123 | | * Forward declaration |
124 | | */ |
125 | | static bool watch(private_stream_service_t *this, int fd, watcher_event_t event); |
126 | | |
127 | | /** |
128 | | * Clean up accept data |
129 | | */ |
130 | | static void destroy_async_data(async_data_t *data) |
131 | 0 | { |
132 | 0 | private_stream_service_t *this = data->this; |
133 | |
|
134 | 0 | this->mutex->lock(this->mutex); |
135 | 0 | if (this->active-- == this->cncrncy && !this->terminated) |
136 | 0 | { |
137 | | /* leaving concurrency limit, restart accept()ing. */ |
138 | 0 | lib->watcher->add(lib->watcher, this->fd, |
139 | 0 | WATCHER_READ, (watcher_cb_t)watch, this); |
140 | 0 | } |
141 | 0 | this->condvar->signal(this->condvar); |
142 | 0 | this->mutex->unlock(this->mutex); |
143 | 0 | destroy_service(this); |
144 | |
|
145 | 0 | if (data->fd != -1) |
146 | 0 | { |
147 | 0 | close(data->fd); |
148 | 0 | } |
149 | 0 | free(data); |
150 | 0 | } |
151 | | |
152 | | /** |
153 | | * Reduce running counter |
154 | | */ |
155 | | CALLBACK(reduce_running, void, |
156 | | async_data_t *data) |
157 | 0 | { |
158 | 0 | private_stream_service_t *this = data->this; |
159 | |
|
160 | 0 | this->mutex->lock(this->mutex); |
161 | 0 | this->running--; |
162 | 0 | this->condvar->signal(this->condvar); |
163 | 0 | this->mutex->unlock(this->mutex); |
164 | 0 | } |
165 | | |
166 | | /** |
167 | | * Async processing of accepted connection |
168 | | */ |
169 | | static job_requeue_t accept_async(async_data_t *data) |
170 | 0 | { |
171 | 0 | private_stream_service_t *this = data->this; |
172 | 0 | stream_t *stream; |
173 | |
|
174 | 0 | this->mutex->lock(this->mutex); |
175 | 0 | if (this->terminated) |
176 | 0 | { |
177 | 0 | this->mutex->unlock(this->mutex); |
178 | 0 | return JOB_REQUEUE_NONE; |
179 | 0 | } |
180 | 0 | this->running++; |
181 | 0 | this->mutex->unlock(this->mutex); |
182 | |
|
183 | 0 | stream = stream_create_from_fd(data->fd); |
184 | 0 | if (stream) |
185 | 0 | { |
186 | | /* FD is now owned by stream, don't close it during cleanup */ |
187 | 0 | data->fd = -1; |
188 | 0 | thread_cleanup_push(reduce_running, data); |
189 | 0 | thread_cleanup_push((void*)stream->destroy, stream); |
190 | 0 | thread_cleanup_pop(!data->cb(data->data, stream)); |
191 | 0 | thread_cleanup_pop(TRUE); |
192 | 0 | } |
193 | 0 | return JOB_REQUEUE_NONE; |
194 | 0 | } |
195 | | |
196 | | /** |
197 | | * Watcher callback function |
198 | | */ |
199 | | static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) |
200 | 0 | { |
201 | 0 | async_data_t *data; |
202 | 0 | bool keep = TRUE; |
203 | |
|
204 | 0 | INIT(data, |
205 | 0 | .cb = this->cb, |
206 | 0 | .data = this->data, |
207 | 0 | .fd = accept(fd, NULL, NULL), |
208 | 0 | .this = this, |
209 | 0 | ); |
210 | |
|
211 | 0 | if (data->fd != -1 && !this->terminated) |
212 | 0 | { |
213 | 0 | this->mutex->lock(this->mutex); |
214 | 0 | if (++this->active == this->cncrncy) |
215 | 0 | { |
216 | | /* concurrency limit reached, stop accept()ing new connections */ |
217 | 0 | keep = FALSE; |
218 | 0 | } |
219 | 0 | this->mutex->unlock(this->mutex); |
220 | 0 | ref_get(&this->ref); |
221 | |
|
222 | 0 | lib->processor->queue_job(lib->processor, |
223 | 0 | (job_t*)callback_job_create_with_prio((void*)accept_async, data, |
224 | 0 | (void*)destroy_async_data, (callback_job_cancel_t)return_false, |
225 | 0 | this->prio)); |
226 | 0 | } |
227 | 0 | else |
228 | 0 | { |
229 | 0 | free(data); |
230 | 0 | } |
231 | 0 | return keep; |
232 | 0 | } |
233 | | |
234 | | METHOD(stream_service_t, on_accept, void, |
235 | | private_stream_service_t *this, stream_service_cb_t cb, void *data, |
236 | | job_priority_t prio, u_int cncrncy) |
237 | 0 | { |
238 | 0 | this->mutex->lock(this->mutex); |
239 | |
|
240 | 0 | if (this->terminated) |
241 | 0 | { |
242 | 0 | this->mutex->unlock(this->mutex); |
243 | 0 | return; |
244 | 0 | } |
245 | | |
246 | | /* wait for all callbacks to return */ |
247 | 0 | while (this->active) |
248 | 0 | { |
249 | 0 | this->condvar->wait(this->condvar, this->mutex); |
250 | 0 | } |
251 | |
|
252 | 0 | if (this->cb) |
253 | 0 | { |
254 | 0 | lib->watcher->remove(lib->watcher, this->fd); |
255 | 0 | } |
256 | |
|
257 | 0 | this->cb = cb; |
258 | 0 | this->data = data; |
259 | 0 | if (prio <= JOB_PRIO_MAX) |
260 | 0 | { |
261 | 0 | this->prio = prio; |
262 | 0 | } |
263 | 0 | this->cncrncy = cncrncy; |
264 | |
|
265 | 0 | if (this->cb) |
266 | 0 | { |
267 | 0 | lib->watcher->add(lib->watcher, this->fd, |
268 | 0 | WATCHER_READ, (watcher_cb_t)watch, this); |
269 | 0 | } |
270 | |
|
271 | 0 | this->mutex->unlock(this->mutex); |
272 | 0 | } |
273 | | |
274 | | METHOD(stream_service_t, destroy, void, |
275 | | private_stream_service_t *this) |
276 | 0 | { |
277 | 0 | this->mutex->lock(this->mutex); |
278 | 0 | lib->watcher->remove(lib->watcher, this->fd); |
279 | 0 | this->terminated = TRUE; |
280 | 0 | while (this->running) |
281 | 0 | { |
282 | 0 | this->condvar->wait(this->condvar, this->mutex); |
283 | 0 | } |
284 | 0 | this->mutex->unlock(this->mutex); |
285 | 0 | destroy_service(this); |
286 | 0 | } |
287 | | |
288 | | /** |
289 | | * See header |
290 | | */ |
291 | | stream_service_t *stream_service_create_from_fd(int fd) |
292 | 0 | { |
293 | 0 | private_stream_service_t *this; |
294 | |
|
295 | 0 | INIT(this, |
296 | 0 | .public = { |
297 | 0 | .on_accept = _on_accept, |
298 | 0 | .destroy = _destroy, |
299 | 0 | }, |
300 | 0 | .fd = fd, |
301 | 0 | .prio = JOB_PRIO_MEDIUM, |
302 | 0 | .mutex = mutex_create(MUTEX_TYPE_RECURSIVE), |
303 | 0 | .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), |
304 | 0 | .ref = 1, |
305 | 0 | ); |
306 | |
|
307 | 0 | return &this->public; |
308 | 0 | } |