/src/open62541/arch/posix/eventloop_posix.c
Line | Count | Source |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
6 | | * Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes) |
7 | | */ |
8 | | |
9 | | #include "eventloop_posix.h" |
10 | | #include "open62541/plugin/eventloop.h" |
11 | | |
12 | | #if defined(UA_ARCHITECTURE_POSIX) && !defined(UA_ARCHITECTURE_LWIP) || defined(UA_ARCHITECTURE_WIN32) |
13 | | |
14 | | /*********/ |
15 | | /* Timer */ |
16 | | /*********/ |
17 | | |
18 | | static UA_DateTime |
19 | 0 | UA_EventLoopPOSIX_nextTimer(UA_EventLoop *public_el) { |
20 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
21 | 0 | if(el->delayedHead1 > (UA_DelayedCallback *)0x01 || |
22 | 0 | el->delayedHead2 > (UA_DelayedCallback *)0x01) |
23 | 0 | return el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); |
24 | 0 | return UA_Timer_next(&el->timer); |
25 | 0 | } |
26 | | |
27 | | static UA_StatusCode |
28 | | UA_EventLoopPOSIX_addTimer(UA_EventLoop *public_el, UA_Callback cb, |
29 | | void *application, void *data, UA_Double interval_ms, |
30 | | UA_DateTime *baseTime, UA_TimerPolicy timerPolicy, |
31 | 0 | UA_UInt64 *callbackId) { |
32 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
33 | 0 | return UA_Timer_add(&el->timer, cb, application, data, interval_ms, |
34 | 0 | public_el->dateTime_nowMonotonic(public_el), |
35 | 0 | baseTime, timerPolicy, callbackId); |
36 | 0 | } |
37 | | |
38 | | static UA_StatusCode |
39 | | UA_EventLoopPOSIX_modifyTimer(UA_EventLoop *public_el, |
40 | | UA_UInt64 callbackId, |
41 | | UA_Double interval_ms, |
42 | | UA_DateTime *baseTime, |
43 | 0 | UA_TimerPolicy timerPolicy) { |
44 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
45 | 0 | return UA_Timer_modify(&el->timer, callbackId, interval_ms, |
46 | 0 | public_el->dateTime_nowMonotonic(public_el), |
47 | 0 | baseTime, timerPolicy); |
48 | 0 | } |
49 | | |
50 | | static void |
51 | | UA_EventLoopPOSIX_removeTimer(UA_EventLoop *public_el, |
52 | 0 | UA_UInt64 callbackId) { |
53 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
54 | 0 | UA_Timer_remove(&el->timer, callbackId); |
55 | 0 | } |
56 | | |
57 | | void |
58 | | UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el, |
59 | 0 | UA_DelayedCallback *dc) { |
60 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
61 | 0 | dc->next = NULL; |
62 | | |
63 | | /* el->delayedTail points either to prev->next or to the head. |
64 | | * We need to update two locations: |
65 | | * 1: el->delayedTail = &dc->next; |
66 | | * 2: *oldtail = dc; (equal to &dc->next) |
67 | | * |
68 | | * Once we have (1), we "own" the previous-to-last entry. No need to worry |
69 | | * about (2), we can adjust it with a delay. This makes the queue |
70 | | * "eventually consistent". */ |
71 | 0 | UA_DelayedCallback **oldtail = (UA_DelayedCallback**) |
72 | 0 | UA_atomic_xchg((void**)&el->delayedTail, &dc->next); |
73 | 0 | UA_atomic_xchg((void**)oldtail, &dc->next); |
74 | 0 | } |
75 | | |
76 | | /* Resets the delayed queue and returns the previous head and tail */ |
77 | | static void |
78 | | resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead, |
79 | 0 | UA_DelayedCallback **oldTail) { |
80 | 0 | if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 && |
81 | 0 | el->delayedHead2 <= (UA_DelayedCallback *)0x01) |
82 | 0 | return; /* The queue is empty */ |
83 | | |
84 | 0 | UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01); |
85 | 0 | UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2; |
86 | 0 | UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1; |
87 | | |
88 | | /* Switch active/inactive by resetting the sentinel values. The (old) active |
89 | | * head points to an element which we return. Parallel threads continue to |
90 | | * add elements to the queue "below" the first element. */ |
91 | 0 | UA_atomic_xchg((void**)inactiveHead, NULL); |
92 | 0 | *oldHead = (UA_DelayedCallback *) |
93 | 0 | UA_atomic_xchg((void**)activeHead, (void*)0x01); |
94 | | |
95 | | /* Make the tail point to the (new) active head. Return the value of last |
96 | | * tail. When iterating over the queue elements, we need to find this tail |
97 | | * as the last element. If we find a NULL next-pointer before hitting the |
98 | | * tail spinlock until the pointer updates (eventually consistent). */ |
99 | 0 | *oldTail = (UA_DelayedCallback*) |
100 | 0 | UA_atomic_xchg((void**)&el->delayedTail, inactiveHead); |
101 | 0 | } |
102 | | |
103 | | static void |
104 | | UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el, |
105 | 0 | UA_DelayedCallback *dc) { |
106 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
107 | 0 | UA_LOCK(&el->elMutex); |
108 | | |
109 | | /* Reset and get the old head and tail */ |
110 | 0 | UA_DelayedCallback *cur = NULL, *tail = NULL; |
111 | 0 | resetDelayedQueue(el, &cur, &tail); |
112 | | |
113 | | /* Loop until we reach the tail (or head and tail are both NULL) */ |
114 | 0 | UA_DelayedCallback *next; |
115 | 0 | for(; cur; cur = next) { |
116 | | /* Spin-loop until the next-pointer of cur is updated. |
117 | | * The element pointed to by tail must appear eventually. */ |
118 | 0 | next = cur->next; |
119 | 0 | while(!next && cur != tail) |
120 | 0 | next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next); |
121 | 0 | if(cur == dc) |
122 | 0 | continue; |
123 | 0 | UA_EventLoopPOSIX_addDelayedCallback(public_el, cur); |
124 | 0 | } |
125 | |
|
126 | 0 | UA_UNLOCK(&el->elMutex); |
127 | 0 | } |
128 | | |
129 | | static void |
130 | 0 | processDelayed(UA_EventLoopPOSIX *el) { |
131 | 0 | UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
132 | 0 | "Process delayed callbacks"); |
133 | |
|
134 | 0 | UA_LOCK_ASSERT(&el->elMutex); |
135 | | |
136 | | /* Reset and get the old head and tail */ |
137 | 0 | UA_DelayedCallback *dc = NULL, *tail = NULL; |
138 | 0 | resetDelayedQueue(el, &dc, &tail); |
139 | | |
140 | | /* Loop until we reach the tail (or head and tail are both NULL) */ |
141 | 0 | UA_DelayedCallback *next; |
142 | 0 | for(; dc; dc = next) { |
143 | 0 | next = dc->next; |
144 | 0 | while(!next && dc != tail) |
145 | 0 | next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next); |
146 | 0 | if(!dc->callback) |
147 | 0 | continue; |
148 | 0 | dc->callback(dc->application, dc->context); |
149 | 0 | } |
150 | 0 | } |
151 | | |
152 | | /***********************/ |
153 | | /* EventLoop Lifecycle */ |
154 | | /***********************/ |
155 | | |
156 | | static UA_StatusCode |
157 | 0 | UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) { |
158 | 0 | UA_LOCK(&el->elMutex); |
159 | |
|
160 | 0 | if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH && |
161 | 0 | el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) { |
162 | 0 | UA_UNLOCK(&el->elMutex); |
163 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
164 | 0 | } |
165 | | |
166 | 0 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
167 | 0 | "Starting the EventLoop"); |
168 | | |
169 | | /* Setting custom clock source */ |
170 | 0 | const UA_Int32 *cs = (const UA_Int32*) |
171 | 0 | UA_KeyValueMap_getScalar(&el->eventLoop.params, |
172 | 0 | UA_QUALIFIEDNAME(0, "clock-source"), |
173 | 0 | &UA_TYPES[UA_TYPES_INT32]); |
174 | 0 | if(cs) |
175 | 0 | el->clockSource = *cs; |
176 | |
|
177 | 0 | const UA_Int32 *csm = (const UA_Int32*) |
178 | 0 | UA_KeyValueMap_getScalar(&el->eventLoop.params, |
179 | 0 | UA_QUALIFIEDNAME(0, "clock-source-monotonic"), |
180 | 0 | &UA_TYPES[UA_TYPES_INT32]); |
181 | 0 | if(csm) { |
182 | 0 | if(el->clockSourceMonotonic != *csm && el->timer.idTree.root) { |
183 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
184 | 0 | "Eventloop\t| Setting a different monotonic clock, ", |
185 | 0 | "but existing timers have been registered with a " |
186 | 0 | "different clock source"); |
187 | 0 | } |
188 | 0 | el->clockSourceMonotonic = *csm; |
189 | 0 | } |
190 | |
|
191 | | #if !defined(UA_ARCHITECTURE_POSIX) |
192 | | if(cs || csm) { |
193 | | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
194 | | "Eventloop\t| Setting a different clock source ", |
195 | | "not supported for this architecture"); |
196 | | } |
197 | | #endif |
198 | | |
199 | | /* Create the self-pipe */ |
200 | 0 | int err = UA_EventLoopPOSIX_pipe(el->selfpipe); |
201 | 0 | if(err != 0) { |
202 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
203 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
204 | 0 | "Eventloop\t| Could not create the self-pipe (%s)", |
205 | 0 | errno_str)); |
206 | 0 | UA_UNLOCK(&el->elMutex); |
207 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
208 | 0 | } |
209 | | |
210 | | /* Create the epoll socket */ |
211 | 0 | #ifdef UA_HAVE_EPOLL |
212 | 0 | el->epollfd = epoll_create1(0); |
213 | 0 | if(el->epollfd == -1) { |
214 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
215 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
216 | 0 | "Eventloop\t| Could not create the epoll socket (%s)", |
217 | 0 | errno_str)); |
218 | 0 | UA_close(el->selfpipe[0]); |
219 | 0 | UA_close(el->selfpipe[1]); |
220 | 0 | UA_UNLOCK(&el->elMutex); |
221 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
222 | 0 | } |
223 | | |
224 | | /* epoll always listens on the self-pipe. This is the only epoll_event that |
225 | | * has a NULL data pointer. */ |
226 | 0 | struct epoll_event event; |
227 | 0 | memset(&event, 0, sizeof(struct epoll_event)); |
228 | 0 | event.events = EPOLLIN; |
229 | 0 | err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, el->selfpipe[0], &event); |
230 | 0 | if(err != 0) { |
231 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
232 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
233 | 0 | "Eventloop\t| Could not register the self-pipe for epoll (%s)", |
234 | 0 | errno_str)); |
235 | 0 | UA_close(el->selfpipe[0]); |
236 | 0 | UA_close(el->selfpipe[1]); |
237 | 0 | close(el->epollfd); |
238 | 0 | UA_UNLOCK(&el->elMutex); |
239 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
240 | 0 | } |
241 | 0 | #endif |
242 | | |
243 | | /* Start the EventSources */ |
244 | 0 | UA_StatusCode res = UA_STATUSCODE_GOOD; |
245 | 0 | UA_EventSource *es = el->eventLoop.eventSources; |
246 | 0 | while(es) { |
247 | 0 | res |= es->start(es); |
248 | 0 | es = es->next; |
249 | 0 | } |
250 | | |
251 | | /* Dirty-write the state that is const "from the outside" */ |
252 | 0 | *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = |
253 | 0 | UA_EVENTLOOPSTATE_STARTED; |
254 | |
|
255 | 0 | UA_UNLOCK(&el->elMutex); |
256 | 0 | return res; |
257 | 0 | } |
258 | | |
259 | | static void |
260 | 0 | checkClosed(UA_EventLoopPOSIX *el) { |
261 | 0 | UA_LOCK_ASSERT(&el->elMutex); |
262 | |
|
263 | 0 | UA_EventSource *es = el->eventLoop.eventSources; |
264 | 0 | while(es) { |
265 | 0 | if(es->state != UA_EVENTSOURCESTATE_STOPPED) |
266 | 0 | return; |
267 | 0 | es = es->next; |
268 | 0 | } |
269 | | |
270 | | /* Not closed until all delayed callbacks are processed */ |
271 | 0 | if(el->delayedHead1 != NULL && el->delayedHead2 != NULL) |
272 | 0 | return; |
273 | | |
274 | | /* Close the self-pipe when everything else is done */ |
275 | 0 | UA_close(el->selfpipe[0]); |
276 | 0 | UA_close(el->selfpipe[1]); |
277 | | |
278 | | /* Dirty-write the state that is const "from the outside" */ |
279 | 0 | *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = |
280 | 0 | UA_EVENTLOOPSTATE_STOPPED; |
281 | | |
282 | | /* Close the epoll/IOCP socket once all EventSources have shut down */ |
283 | 0 | #ifdef UA_HAVE_EPOLL |
284 | 0 | UA_close(el->epollfd); |
285 | 0 | #endif |
286 | |
|
287 | 0 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
288 | 0 | "The EventLoop has stopped"); |
289 | 0 | } |
290 | | |
291 | | static void |
292 | 0 | UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) { |
293 | 0 | UA_LOCK(&el->elMutex); |
294 | |
|
295 | 0 | if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) { |
296 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
297 | 0 | "The EventLoop is not running, cannot be stopped"); |
298 | 0 | UA_UNLOCK(&el->elMutex); |
299 | 0 | return; |
300 | 0 | } |
301 | | |
302 | 0 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
303 | 0 | "Stopping the EventLoop"); |
304 | | |
305 | | /* Set to STOPPING to prevent "normal use" */ |
306 | 0 | *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = |
307 | 0 | UA_EVENTLOOPSTATE_STOPPING; |
308 | | |
309 | | /* Stop all event sources (asynchronous) */ |
310 | 0 | UA_EventSource *es = el->eventLoop.eventSources; |
311 | 0 | for(; es; es = es->next) { |
312 | 0 | if(es->state == UA_EVENTSOURCESTATE_STARTING || |
313 | 0 | es->state == UA_EVENTSOURCESTATE_STARTED) { |
314 | 0 | es->stop(es); |
315 | 0 | } |
316 | 0 | } |
317 | | |
318 | | /* Set to STOPPED if all EventSources are STOPPED */ |
319 | 0 | checkClosed(el); |
320 | |
|
321 | 0 | UA_UNLOCK(&el->elMutex); |
322 | 0 | } |
323 | | |
324 | | static UA_StatusCode |
325 | 0 | UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) { |
326 | 0 | UA_LOCK(&el->elMutex); |
327 | |
|
328 | 0 | if(el->executing) { |
329 | 0 | UA_LOG_ERROR(el->eventLoop.logger, |
330 | 0 | UA_LOGCATEGORY_EVENTLOOP, |
331 | 0 | "Cannot run EventLoop from the run method itself"); |
332 | 0 | UA_UNLOCK(&el->elMutex); |
333 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
334 | 0 | } |
335 | | |
336 | 0 | el->executing = true; |
337 | |
|
338 | 0 | if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH || |
339 | 0 | el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) { |
340 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
341 | 0 | "Cannot run a stopped EventLoop"); |
342 | 0 | el->executing = false; |
343 | 0 | UA_UNLOCK(&el->elMutex); |
344 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
345 | 0 | } |
346 | | |
347 | 0 | UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
348 | 0 | "Iterate the EventLoop"); |
349 | | |
350 | | /* Process cyclic callbacks */ |
351 | 0 | UA_DateTime dateBefore = |
352 | 0 | el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); |
353 | |
|
354 | 0 | UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore); |
355 | | |
356 | | /* Process delayed callbacks here: |
357 | | * - Removes closed sockets already here instead of polling them again. |
358 | | * - The timeout for polling is selected to be ready in time for the next |
359 | | * cyclic callback. So we want to do little work between the timeout |
360 | | * running out and executing the due cyclic callbacks. */ |
361 | 0 | processDelayed(el); |
362 | | |
363 | | /* A delayed callback could create another delayed callback (or re-add |
364 | | * itself). In that case we don't want to wait (indefinitely) for an event |
365 | | * to happen. Process queued events but don't sleep. Then process the |
366 | | * delayed callbacks in the next iteration. */ |
367 | 0 | if(el->delayedHead1 != NULL && el->delayedHead2 != NULL) |
368 | 0 | timeout = 0; |
369 | | |
370 | | /* Compute the remaining time */ |
371 | 0 | UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC); |
372 | 0 | if(dateNext > maxDate) |
373 | 0 | dateNext = maxDate; |
374 | 0 | UA_DateTime listenTimeout = |
375 | 0 | dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); |
376 | 0 | if(listenTimeout < 0) |
377 | 0 | listenTimeout = 0; |
378 | | |
379 | | /* Listen on the active file-descriptors (sockets) from the |
380 | | * ConnectionManagers */ |
381 | 0 | UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout); |
382 | | |
383 | | /* Check if the last EventSource was successfully stopped */ |
384 | 0 | if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING) |
385 | 0 | checkClosed(el); |
386 | |
|
387 | 0 | el->executing = false; |
388 | 0 | UA_UNLOCK(&el->elMutex); |
389 | 0 | return rv; |
390 | 0 | } |
391 | | |
392 | | /*****************************/ |
393 | | /* Registering Event Sources */ |
394 | | /*****************************/ |
395 | | |
396 | | static UA_StatusCode |
397 | | UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el, |
398 | 0 | UA_EventSource *es) { |
399 | 0 | UA_LOCK(&el->elMutex); |
400 | | |
401 | | /* Already registered? */ |
402 | 0 | if(es->state != UA_EVENTSOURCESTATE_FRESH) { |
403 | 0 | UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
404 | 0 | "Cannot register the EventSource \"%.*s\": " |
405 | 0 | "already registered", |
406 | 0 | (int)es->name.length, (char*)es->name.data); |
407 | 0 | UA_UNLOCK(&el->elMutex); |
408 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
409 | 0 | } |
410 | | |
411 | | /* Add to linked list */ |
412 | 0 | es->next = el->eventLoop.eventSources; |
413 | 0 | el->eventLoop.eventSources = es; |
414 | |
|
415 | 0 | es->eventLoop = &el->eventLoop; |
416 | 0 | es->state = UA_EVENTSOURCESTATE_STOPPED; |
417 | | |
418 | | /* Start if the entire EventLoop is started */ |
419 | 0 | UA_StatusCode res = UA_STATUSCODE_GOOD; |
420 | 0 | if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED) |
421 | 0 | res = es->start(es); |
422 | |
|
423 | 0 | UA_UNLOCK(&el->elMutex); |
424 | 0 | return res; |
425 | 0 | } |
426 | | |
427 | | static UA_StatusCode |
428 | | UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el, |
429 | 0 | UA_EventSource *es) { |
430 | 0 | UA_LOCK(&el->elMutex); |
431 | |
|
432 | 0 | if(es->state != UA_EVENTSOURCESTATE_STOPPED) { |
433 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
434 | 0 | "Cannot deregister the EventSource %.*s: " |
435 | 0 | "Has to be stopped first", |
436 | 0 | (int)es->name.length, es->name.data); |
437 | 0 | UA_UNLOCK(&el->elMutex); |
438 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
439 | 0 | } |
440 | | |
441 | | /* Remove from the linked list */ |
442 | 0 | UA_EventSource **s = &el->eventLoop.eventSources; |
443 | 0 | while(*s) { |
444 | 0 | if(*s == es) { |
445 | 0 | *s = es->next; |
446 | 0 | break; |
447 | 0 | } |
448 | 0 | s = &(*s)->next; |
449 | 0 | } |
450 | | |
451 | | /* Set the state to non-registered */ |
452 | 0 | es->state = UA_EVENTSOURCESTATE_FRESH; |
453 | |
|
454 | 0 | UA_UNLOCK(&el->elMutex); |
455 | 0 | return UA_STATUSCODE_GOOD; |
456 | 0 | } |
457 | | |
458 | | /***************/ |
459 | | /* Time Domain */ |
460 | | /***************/ |
461 | | |
462 | | static UA_DateTime |
463 | 0 | UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) { |
464 | 0 | #if defined(UA_ARCHITECTURE_POSIX) |
465 | 0 | UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el; |
466 | 0 | struct timespec ts; |
467 | 0 | int res = clock_gettime(pel->clockSource, &ts); |
468 | 0 | if(UA_UNLIKELY(res != 0)) |
469 | 0 | return 0; |
470 | 0 | return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH; |
471 | | #else |
472 | | return UA_DateTime_now(); |
473 | | #endif |
474 | 0 | } |
475 | | |
476 | | static UA_DateTime |
477 | 0 | UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) { |
478 | 0 | #if defined(UA_ARCHITECTURE_POSIX) |
479 | 0 | UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el; |
480 | 0 | struct timespec ts; |
481 | 0 | int res = clock_gettime(pel->clockSourceMonotonic, &ts); |
482 | 0 | if(UA_UNLIKELY(res != 0)) |
483 | 0 | return 0; |
484 | | /* Also add the unix epoch for the monotonic clock. So we get a "normal" |
485 | | * output when a "normal" source is configured. */ |
486 | 0 | return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH; |
487 | | #else |
488 | | return UA_DateTime_nowMonotonic(); |
489 | | #endif |
490 | 0 | } |
491 | | |
492 | | static UA_Int64 |
493 | 0 | UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) { |
494 | | /* TODO: Fix for custom clock sources */ |
495 | 0 | return UA_DateTime_localTimeUtcOffset(); |
496 | 0 | } |
497 | | |
498 | | /*************************/ |
499 | | /* Initialize and Delete */ |
500 | | /*************************/ |
501 | | |
502 | | static UA_StatusCode |
503 | 0 | UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) { |
504 | 0 | UA_LOCK(&el->elMutex); |
505 | | |
506 | | /* Check if the EventLoop can be deleted */ |
507 | 0 | if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED && |
508 | 0 | el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) { |
509 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
510 | 0 | "Cannot delete a running EventLoop"); |
511 | 0 | UA_UNLOCK(&el->elMutex); |
512 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
513 | 0 | } |
514 | | |
515 | | /* Deregister and delete all the EventSources */ |
516 | 0 | while(el->eventLoop.eventSources) { |
517 | 0 | UA_EventSource *es = el->eventLoop.eventSources; |
518 | 0 | UA_EventLoopPOSIX_deregisterEventSource(el, es); |
519 | 0 | es->free(es); |
520 | 0 | } |
521 | | |
522 | | /* Remove the repeated timed callbacks */ |
523 | 0 | UA_Timer_clear(&el->timer); |
524 | | |
525 | | /* Process remaining delayed callbacks */ |
526 | 0 | processDelayed(el); |
527 | |
|
528 | | #ifdef UA_ARCHITECTURE_WIN32 |
529 | | /* Stop the Windows networking subsystem */ |
530 | | WSACleanup(); |
531 | | #endif |
532 | |
|
533 | 0 | UA_KeyValueMap_clear(&el->eventLoop.params); |
534 | | |
535 | | /* Clean up */ |
536 | 0 | UA_UNLOCK(&el->elMutex); |
537 | 0 | UA_LOCK_DESTROY(&el->elMutex); |
538 | 0 | UA_free(el); |
539 | 0 | return UA_STATUSCODE_GOOD; |
540 | 0 | } |
541 | | |
542 | | static void |
543 | 0 | UA_EventLoopPOSIX_lock(UA_EventLoop *public_el) { |
544 | 0 | UA_LOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex); |
545 | 0 | } |
546 | | static void |
547 | 0 | UA_EventLoopPOSIX_unlock(UA_EventLoop *public_el) { |
548 | 0 | UA_UNLOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex); |
549 | 0 | } |
550 | | |
551 | | UA_EventLoop * |
552 | 0 | UA_EventLoop_new_POSIX(const UA_Logger *logger) { |
553 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*) |
554 | 0 | UA_calloc(1, sizeof(UA_EventLoopPOSIX)); |
555 | 0 | if(!el) |
556 | 0 | return NULL; |
557 | | |
558 | 0 | UA_LOCK_INIT(&el->elMutex); |
559 | 0 | UA_Timer_init(&el->timer); |
560 | | |
561 | | /* Initialize the queue */ |
562 | 0 | el->delayedTail = &el->delayedHead1; |
563 | 0 | el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */ |
564 | |
|
565 | | #ifdef UA_ARCHITECTURE_WIN32 |
566 | | /* Start the WSA networking subsystem on Windows */ |
567 | | WSADATA wsaData; |
568 | | WSAStartup(MAKEWORD(2, 2), &wsaData); |
569 | | #endif |
570 | | |
571 | | /* Set the public EventLoop content */ |
572 | 0 | el->eventLoop.logger = logger; |
573 | | |
574 | | /* Initialize the clock source to the default */ |
575 | 0 | #if defined(UA_ARCHITECTURE_POSIX) |
576 | 0 | el->clockSource = CLOCK_REALTIME; |
577 | 0 | # ifdef CLOCK_MONOTONIC_RAW |
578 | 0 | el->clockSourceMonotonic = CLOCK_MONOTONIC_RAW; |
579 | | # else |
580 | | el->clockSourceMonotonic = CLOCK_MONOTONIC; |
581 | | # endif |
582 | 0 | #endif |
583 | | |
584 | | /* Set the method pointers for the interface */ |
585 | 0 | el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start; |
586 | 0 | el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop; |
587 | 0 | el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free; |
588 | 0 | el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run; |
589 | 0 | el->eventLoop.cancel = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_cancel; |
590 | |
|
591 | 0 | el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now; |
592 | 0 | el->eventLoop.dateTime_nowMonotonic = |
593 | 0 | UA_EventLoopPOSIX_DateTime_nowMonotonic; |
594 | 0 | el->eventLoop.dateTime_localTimeUtcOffset = |
595 | 0 | UA_EventLoopPOSIX_DateTime_localTimeUtcOffset; |
596 | |
|
597 | 0 | el->eventLoop.nextTimer = UA_EventLoopPOSIX_nextTimer; |
598 | 0 | el->eventLoop.addTimer = UA_EventLoopPOSIX_addTimer; |
599 | 0 | el->eventLoop.modifyTimer = UA_EventLoopPOSIX_modifyTimer; |
600 | 0 | el->eventLoop.removeTimer = UA_EventLoopPOSIX_removeTimer; |
601 | 0 | el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback; |
602 | 0 | el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback; |
603 | |
|
604 | 0 | el->eventLoop.registerEventSource = |
605 | 0 | (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*)) |
606 | 0 | UA_EventLoopPOSIX_registerEventSource; |
607 | 0 | el->eventLoop.deregisterEventSource = |
608 | 0 | (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*)) |
609 | 0 | UA_EventLoopPOSIX_deregisterEventSource; |
610 | |
|
611 | 0 | el->eventLoop.lock = UA_EventLoopPOSIX_lock; |
612 | 0 | el->eventLoop.unlock = UA_EventLoopPOSIX_unlock; |
613 | |
|
614 | 0 | return &el->eventLoop; |
615 | 0 | } |
616 | | |
617 | | /***************************/ |
618 | | /* Network Buffer Handling */ |
619 | | /***************************/ |
620 | | |
621 | | UA_StatusCode |
622 | | UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm, |
623 | | uintptr_t connectionId, |
624 | | UA_ByteString *buf, |
625 | 0 | size_t bufSize) { |
626 | 0 | UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; |
627 | 0 | if(pcm->txBuffer.length == 0) |
628 | 0 | return UA_ByteString_allocBuffer(buf, bufSize); |
629 | 0 | if(pcm->txBuffer.length < bufSize) |
630 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
631 | 0 | *buf = pcm->txBuffer; |
632 | 0 | buf->length = bufSize; |
633 | 0 | return UA_STATUSCODE_GOOD; |
634 | 0 | } |
635 | | |
636 | | void |
637 | | UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm, |
638 | | uintptr_t connectionId, |
639 | 0 | UA_ByteString *buf) { |
640 | 0 | UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; |
641 | 0 | if(pcm->txBuffer.data == buf->data) |
642 | 0 | UA_ByteString_init(buf); |
643 | 0 | else |
644 | 0 | UA_ByteString_clear(buf); |
645 | 0 | } |
646 | | |
647 | | UA_StatusCode |
648 | 0 | UA_EventLoopPOSIX_allocateStaticBuffers(UA_POSIXConnectionManager *pcm) { |
649 | 0 | UA_StatusCode res = UA_STATUSCODE_GOOD; |
650 | 0 | UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */ |
651 | 0 | const UA_UInt32 *configRxBufSize = (const UA_UInt32 *) |
652 | 0 | UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params, |
653 | 0 | UA_QUALIFIEDNAME(0, "recv-bufsize"), |
654 | 0 | &UA_TYPES[UA_TYPES_UINT32]); |
655 | 0 | if(configRxBufSize) |
656 | 0 | rxBufSize = *configRxBufSize; |
657 | 0 | if(pcm->rxBuffer.length != rxBufSize) { |
658 | 0 | UA_ByteString_clear(&pcm->rxBuffer); |
659 | 0 | res = UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize); |
660 | 0 | } |
661 | |
|
662 | 0 | const UA_UInt32 *txBufSize = (const UA_UInt32 *) |
663 | 0 | UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params, |
664 | 0 | UA_QUALIFIEDNAME(0, "send-bufsize"), |
665 | 0 | &UA_TYPES[UA_TYPES_UINT32]); |
666 | 0 | if(txBufSize && pcm->txBuffer.length != *txBufSize) { |
667 | 0 | UA_ByteString_clear(&pcm->txBuffer); |
668 | 0 | res |= UA_ByteString_allocBuffer(&pcm->txBuffer, *txBufSize); |
669 | 0 | } |
670 | 0 | return res; |
671 | 0 | } |
672 | | |
673 | | /******************/ |
674 | | /* Socket Options */ |
675 | | /******************/ |
676 | | |
677 | | enum ZIP_CMP |
678 | 0 | cmpFD(const UA_FD *a, const UA_FD *b) { |
679 | 0 | if(*a == *b) |
680 | 0 | return ZIP_CMP_EQ; |
681 | 0 | return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE; |
682 | 0 | } |
683 | | |
684 | | UA_StatusCode |
685 | 0 | UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) { |
686 | 0 | #ifndef UA_ARCHITECTURE_WIN32 |
687 | 0 | int opts = fcntl(sockfd, F_GETFL); |
688 | 0 | if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0) |
689 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
690 | | #else |
691 | | u_long iMode = 1; |
692 | | if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR) |
693 | | return UA_STATUSCODE_BADINTERNALERROR; |
694 | | #endif |
695 | 0 | return UA_STATUSCODE_GOOD; |
696 | 0 | } |
697 | | |
698 | | UA_StatusCode |
699 | 0 | UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) { |
700 | | #ifdef SO_NOSIGPIPE |
701 | | int val = 1; |
702 | | int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)); |
703 | | if(res < 0) |
704 | | return UA_STATUSCODE_BADINTERNALERROR; |
705 | | #endif |
706 | 0 | return UA_STATUSCODE_GOOD; |
707 | 0 | } |
708 | | |
709 | | UA_StatusCode |
710 | 0 | UA_EventLoopPOSIX_setReusable(UA_FD sockfd) { |
711 | 0 | int enableReuseVal = 1; |
712 | 0 | #ifndef UA_ARCHITECTURE_WIN32 |
713 | 0 | int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, |
714 | 0 | (const char*)&enableReuseVal, sizeof(enableReuseVal)); |
715 | 0 | res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, |
716 | 0 | (const char*)&enableReuseVal, sizeof(enableReuseVal)); |
717 | 0 | return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR; |
718 | | #else |
719 | | int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, |
720 | | (const char*)&enableReuseVal, sizeof(enableReuseVal)); |
721 | | return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR; |
722 | | #endif |
723 | 0 | } |
724 | | |
725 | | /************************/ |
726 | | /* Select / epoll Logic */ |
727 | | /************************/ |
728 | | |
729 | | /* Re-arm the self-pipe socket for the next signal by reading from it */ |
730 | | static void |
731 | 0 | flushSelfPipe(UA_SOCKET s) { |
732 | 0 | char buf[128]; |
733 | | #ifdef UA_ARCHITECTURE_WIN32 |
734 | | recv(s, buf, 128, 0); |
735 | | #else |
736 | 0 | ssize_t i; |
737 | 0 | do { |
738 | 0 | i = read(s, buf, 128); |
739 | 0 | } while(i > 0); |
740 | 0 | #endif |
741 | 0 | } |
742 | | |
743 | | #if !defined(UA_HAVE_EPOLL) |
744 | | |
745 | | UA_StatusCode |
746 | | UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
747 | | UA_LOCK_ASSERT(&el->elMutex); |
748 | | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
749 | | "Registering fd: %u", (unsigned)rfd->fd); |
750 | | |
751 | | /* Realloc */ |
752 | | UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**) |
753 | | UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * (el->fdsSize + 1)); |
754 | | if(!fds_tmp) { |
755 | | return UA_STATUSCODE_BADOUTOFMEMORY; |
756 | | } |
757 | | el->fds = fds_tmp; |
758 | | |
759 | | /* Add to the last entry */ |
760 | | el->fds[el->fdsSize] = rfd; |
761 | | el->fdsSize++; |
762 | | return UA_STATUSCODE_GOOD; |
763 | | } |
764 | | |
765 | | UA_StatusCode |
766 | | UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
767 | | /* Do nothing, it is enough if the data was changed in the rfd */ |
768 | | UA_LOCK_ASSERT(&el->elMutex); |
769 | | return UA_STATUSCODE_GOOD; |
770 | | } |
771 | | |
772 | | void |
773 | | UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
774 | | UA_LOCK_ASSERT(&el->elMutex); |
775 | | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
776 | | "Unregistering fd: %u", (unsigned)rfd->fd); |
777 | | |
778 | | /* Find the entry */ |
779 | | size_t i = 0; |
780 | | for(; i < el->fdsSize; i++) { |
781 | | if(el->fds[i] == rfd) |
782 | | break; |
783 | | } |
784 | | |
785 | | /* Not found? */ |
786 | | if(i == el->fdsSize) |
787 | | return; |
788 | | |
789 | | if(el->fdsSize > 1) { |
790 | | /* Move the last entry in the ith slot and realloc. */ |
791 | | el->fdsSize--; |
792 | | el->fds[i] = el->fds[el->fdsSize]; |
793 | | UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**) |
794 | | UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * el->fdsSize); |
795 | | /* if realloc fails the fds are still in a correct state with |
796 | | * possibly lost memory, so failing silently here is ok */ |
797 | | if(fds_tmp) |
798 | | el->fds = fds_tmp; |
799 | | } else { |
800 | | /* Remove the last entry */ |
801 | | UA_free(el->fds); |
802 | | el->fds = NULL; |
803 | | el->fdsSize = 0; |
804 | | } |
805 | | } |
806 | | |
807 | | static UA_FD |
808 | | setFDSets(UA_EventLoopPOSIX *el, fd_set *readset, fd_set *writeset, fd_set *errset) { |
809 | | UA_LOCK_ASSERT(&el->elMutex); |
810 | | |
811 | | FD_ZERO(readset); |
812 | | FD_ZERO(writeset); |
813 | | FD_ZERO(errset); |
814 | | |
815 | | /* Always listen on the read-end of the pipe */ |
816 | | UA_FD highestfd = el->selfpipe[0]; |
817 | | FD_SET(el->selfpipe[0], readset); |
818 | | |
819 | | for(size_t i = 0; i < el->fdsSize; i++) { |
820 | | UA_FD currentFD = el->fds[i]->fd; |
821 | | |
822 | | /* Add to the fd_sets */ |
823 | | if(el->fds[i]->listenEvents & UA_FDEVENT_IN) |
824 | | FD_SET(currentFD, readset); |
825 | | if(el->fds[i]->listenEvents & UA_FDEVENT_OUT) |
826 | | FD_SET(currentFD, writeset); |
827 | | |
828 | | /* Always return errors */ |
829 | | FD_SET(currentFD, errset); |
830 | | |
831 | | /* Highest fd? */ |
832 | | if(currentFD > highestfd) |
833 | | highestfd = currentFD; |
834 | | } |
835 | | return highestfd; |
836 | | } |
837 | | |
838 | | UA_StatusCode |
839 | | UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) { |
840 | | UA_assert(listenTimeout >= 0); |
841 | | UA_LOCK_ASSERT(&el->elMutex); |
842 | | |
843 | | fd_set readset, writeset, errset; |
844 | | UA_FD highestfd = setFDSets(el, &readset, &writeset, &errset); |
845 | | |
846 | | /* Nothing to do? */ |
847 | | if(highestfd == UA_INVALID_FD) { |
848 | | UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
849 | | "No valid FDs for processing"); |
850 | | return UA_STATUSCODE_GOOD; |
851 | | } |
852 | | |
853 | | struct timeval tmptv = { |
854 | | #ifndef UA_ARCHITECTURE_WIN32 |
855 | | (time_t)(listenTimeout / UA_DATETIME_SEC), |
856 | | (suseconds_t)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC) |
857 | | #else |
858 | | (long)(listenTimeout / UA_DATETIME_SEC), |
859 | | (long)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC) |
860 | | #endif |
861 | | }; |
862 | | |
863 | | UA_UNLOCK(&el->elMutex); |
864 | | int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv); |
865 | | UA_LOCK(&el->elMutex); |
866 | | if(selectStatus < 0) { |
867 | | /* We will retry, only log the error */ |
868 | | UA_LOG_SOCKET_ERRNO_WRAP( |
869 | | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
870 | | "Error during select: %s", errno_str)); |
871 | | return UA_STATUSCODE_GOOD; |
872 | | } |
873 | | |
874 | | /* The self-pipe has received. Clear the buffer by reading. */ |
875 | | if(UA_UNLIKELY(FD_ISSET(el->selfpipe[0], &readset))) |
876 | | flushSelfPipe(el->selfpipe[0]); |
877 | | |
878 | | /* Loop over all registered FD to see if an event arrived. Yes, this is why |
879 | | * select is slow for many open sockets. */ |
880 | | for(size_t i = 0; i < el->fdsSize; i++) { |
881 | | UA_RegisteredFD *rfd = el->fds[i]; |
882 | | |
883 | | /* The rfd is already registered for removal. Don't process incoming |
884 | | * events any longer. */ |
885 | | if(rfd->dc.callback) |
886 | | continue; |
887 | | |
888 | | /* Event signaled for the fd? */ |
889 | | short event = 0; |
890 | | if(FD_ISSET(rfd->fd, &readset)) { |
891 | | event = UA_FDEVENT_IN; |
892 | | } else if(FD_ISSET(rfd->fd, &writeset)) { |
893 | | event = UA_FDEVENT_OUT; |
894 | | } else if(FD_ISSET(rfd->fd, &errset)) { |
895 | | event = UA_FDEVENT_ERR; |
896 | | } else { |
897 | | continue; |
898 | | } |
899 | | |
900 | | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
901 | | "Processing event %u on fd %u", (unsigned)event, |
902 | | (unsigned)rfd->fd); |
903 | | |
904 | | /* Call the EventSource callback */ |
905 | | rfd->eventSourceCB(rfd->es, rfd, event); |
906 | | |
907 | | /* The fd has removed itself */ |
908 | | if(i == el->fdsSize || rfd != el->fds[i]) |
909 | | i--; |
910 | | } |
911 | | return UA_STATUSCODE_GOOD; |
912 | | } |
913 | | |
914 | | #else /* defined(UA_HAVE_EPOLL) */ |
915 | | |
916 | | UA_StatusCode |
917 | 0 | UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
918 | 0 | struct epoll_event event; |
919 | 0 | memset(&event, 0, sizeof(struct epoll_event)); |
920 | 0 | event.data.ptr = rfd; |
921 | 0 | event.events = 0; |
922 | 0 | if(rfd->listenEvents & UA_FDEVENT_IN) |
923 | 0 | event.events |= EPOLLIN; |
924 | 0 | if(rfd->listenEvents & UA_FDEVENT_OUT) |
925 | 0 | event.events |= EPOLLOUT; |
926 | |
|
927 | 0 | int err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, rfd->fd, &event); |
928 | 0 | if(err != 0) { |
929 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
930 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
931 | 0 | "TCP %u\t| Could not register for epoll (%s)", |
932 | 0 | rfd->fd, errno_str)); |
933 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
934 | 0 | } |
935 | 0 | return UA_STATUSCODE_GOOD; |
936 | 0 | } |
937 | | |
938 | | UA_StatusCode |
939 | 0 | UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
940 | 0 | struct epoll_event event; |
941 | 0 | memset(&event, 0, sizeof(struct epoll_event)); |
942 | 0 | event.data.ptr = rfd; |
943 | 0 | event.events = 0; |
944 | 0 | if(rfd->listenEvents & UA_FDEVENT_IN) |
945 | 0 | event.events |= EPOLLIN; |
946 | 0 | if(rfd->listenEvents & UA_FDEVENT_OUT) |
947 | 0 | event.events |= EPOLLOUT; |
948 | |
|
949 | 0 | int err = epoll_ctl(el->epollfd, EPOLL_CTL_MOD, rfd->fd, &event); |
950 | 0 | if(err != 0) { |
951 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
952 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
953 | 0 | "TCP %u\t| Could not modify for epoll (%s)", |
954 | 0 | rfd->fd, errno_str)); |
955 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
956 | 0 | } |
957 | 0 | return UA_STATUSCODE_GOOD; |
958 | 0 | } |
959 | | |
960 | | void |
961 | 0 | UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
962 | 0 | int res = epoll_ctl(el->epollfd, EPOLL_CTL_DEL, rfd->fd, NULL); |
963 | 0 | if(res != 0) { |
964 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
965 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
966 | 0 | "TCP %u\t| Could not deregister from epoll (%s)", |
967 | 0 | rfd->fd, errno_str)); |
968 | 0 | } |
969 | 0 | } |
970 | | |
971 | | UA_StatusCode |
972 | 0 | UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) { |
973 | 0 | UA_assert(listenTimeout >= 0); |
974 | | |
975 | | /* If there is a positive timeout, wait at least one millisecond, the |
976 | | * minimum for blocking epoll_wait. This prevents a busy-loop, as the |
977 | | * open62541 library allows even smaller timeouts, which can result in a |
978 | | * zero timeout due to rounding to an integer here. */ |
979 | 0 | int timeout = (int)(listenTimeout / UA_DATETIME_MSEC); |
980 | 0 | if(timeout == 0 && listenTimeout > 0) |
981 | 0 | timeout = 1; |
982 | | |
983 | | /* Poll the registered sockets */ |
984 | 0 | struct epoll_event epoll_events[64]; |
985 | 0 | UA_UNLOCK(&el->elMutex); |
986 | 0 | int events = epoll_wait(el->epollfd, epoll_events, 64, timeout); |
987 | 0 | UA_LOCK(&el->elMutex); |
988 | | |
989 | | /* TODO: Replace with pwait2 for higher-precision timeouts once this is |
990 | | * available in the standard library. |
991 | | * |
992 | | * struct timespec precisionTimeout = { |
993 | | * (long)(listenTimeout / UA_DATETIME_SEC), |
994 | | * (long)((listenTimeout % UA_DATETIME_SEC) * 100) |
995 | | * }; |
996 | | * int events = epoll_pwait2(epollfd, epoll_events, 64, |
997 | | * precisionTimeout, NULL); */ |
998 | | |
999 | | /* Handle error conditions */ |
1000 | 0 | if(events == -1) { |
1001 | 0 | if(errno == EINTR) { |
1002 | | /* We will retry, only log the error */ |
1003 | 0 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
1004 | 0 | "Timeout during poll"); |
1005 | 0 | return UA_STATUSCODE_GOOD; |
1006 | 0 | } |
1007 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
1008 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
1009 | 0 | "TCP\t| Error %s, closing the server socket", |
1010 | 0 | errno_str)); |
1011 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
1012 | 0 | } |
1013 | | |
1014 | | /* Process all received events */ |
1015 | 0 | for(int i = 0; i < events; i++) { |
1016 | 0 | UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr; |
1017 | | |
1018 | | /* The self-pipe has received */ |
1019 | 0 | if(!rfd) { |
1020 | 0 | flushSelfPipe(el->selfpipe[0]); |
1021 | 0 | continue; |
1022 | 0 | } |
1023 | | |
1024 | | /* The rfd is already registered for removal. Don't process incoming |
1025 | | * events any longer. */ |
1026 | 0 | if(rfd->dc.callback) |
1027 | 0 | continue; |
1028 | | |
1029 | | /* Get the event */ |
1030 | 0 | short revent = 0; |
1031 | 0 | if((epoll_events[i].events & EPOLLIN) == EPOLLIN) { |
1032 | 0 | revent = UA_FDEVENT_IN; |
1033 | 0 | } else if((epoll_events[i].events & EPOLLOUT) == EPOLLOUT) { |
1034 | 0 | revent = UA_FDEVENT_OUT; |
1035 | 0 | } else { |
1036 | 0 | revent = UA_FDEVENT_ERR; |
1037 | 0 | } |
1038 | | |
1039 | | /* Call the EventSource callback */ |
1040 | 0 | rfd->eventSourceCB(rfd->es, rfd, revent); |
1041 | 0 | } |
1042 | 0 | return UA_STATUSCODE_GOOD; |
1043 | 0 | } |
1044 | | |
1045 | | #endif /* defined(UA_HAVE_EPOLL) */ |
1046 | | |
1047 | | #if defined(UA_ARCHITECTURE_WIN32) || defined(__APPLE__) |
1048 | | int UA_EventLoopPOSIX_pipe(SOCKET fds[2]) { |
1049 | | struct sockaddr_in inaddr; |
1050 | | memset(&inaddr, 0, sizeof(inaddr)); |
1051 | | inaddr.sin_family = AF_INET; |
1052 | | inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); |
1053 | | inaddr.sin_port = 0; |
1054 | | |
1055 | | SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
1056 | | bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr)); |
1057 | | listen(lst, 1); |
1058 | | |
1059 | | struct sockaddr_storage addr; |
1060 | | memset(&addr, 0, sizeof(addr)); |
1061 | | int len = sizeof(addr); |
1062 | | getsockname(lst, (struct sockaddr*)&addr, &len); |
1063 | | |
1064 | | fds[0] = socket(AF_INET, SOCK_STREAM, 0); |
1065 | | int err = connect(fds[0], (struct sockaddr*)&addr, len); |
1066 | | fds[1] = accept(lst, 0, 0); |
1067 | | #ifdef UA_ARCHITECTURE_WIN32 |
1068 | | closesocket(lst); |
1069 | | #endif |
1070 | | #ifdef __APPLE__ |
1071 | | close(lst); |
1072 | | #endif |
1073 | | |
1074 | | UA_EventLoopPOSIX_setNoSigPipe(fds[0]); |
1075 | | UA_EventLoopPOSIX_setReusable(fds[0]); |
1076 | | UA_EventLoopPOSIX_setNonBlocking(fds[0]); |
1077 | | UA_EventLoopPOSIX_setNoSigPipe(fds[1]); |
1078 | | UA_EventLoopPOSIX_setReusable(fds[1]); |
1079 | | UA_EventLoopPOSIX_setNonBlocking(fds[1]); |
1080 | | return err; |
1081 | | } |
1082 | | #elif defined(__QNX__) |
1083 | | int UA_EventLoopPOSIX_pipe(int fds[2]) { |
1084 | | int err = pipe(fds); |
1085 | | if(err == -1) { |
1086 | | return err; |
1087 | | } |
1088 | | |
1089 | | err = fcntl(fds[0], F_SETFL, O_NONBLOCK); |
1090 | | if(err == -1) { |
1091 | | return err; |
1092 | | } |
1093 | | return err; |
1094 | | } |
1095 | | #endif |
1096 | | |
1097 | | void |
1098 | 0 | UA_EventLoopPOSIX_cancel(UA_EventLoopPOSIX *el) { |
1099 | | /* Nothing to do if the EventLoop is not executing */ |
1100 | 0 | if(!el->executing) |
1101 | 0 | return; |
1102 | | |
1103 | | /* Trigger the self-pipe */ |
1104 | | #ifdef UA_ARCHITECTURE_WIN32 |
1105 | | int err = send(el->selfpipe[1], ".", 1, 0); |
1106 | | #else |
1107 | 0 | ssize_t err = write(el->selfpipe[1], ".", 1); |
1108 | 0 | #endif |
1109 | 0 | if(err <= 0) { |
1110 | | UA_LOG_SOCKET_ERRNO_WRAP( |
1111 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
1112 | 0 | "Eventloop\t| Error signaling self-pipe (%s)", errno_str)); |
1113 | 0 | } |
1114 | 0 | } |
1115 | | |
1116 | | #endif |