/src/open62541/arch/posix/eventloop_posix.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* This Source Code Form is subject to the terms of the Mozilla Public |
2 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
3 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
4 | | * |
5 | | * Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer) |
6 | | * Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes) |
7 | | */ |
8 | | |
9 | | #include "eventloop_posix.h" |
10 | | #include "open62541/plugin/eventloop.h" |
11 | | |
12 | | #if defined(UA_ARCHITECTURE_POSIX) && !defined(UA_ARCHITECTURE_LWIP) || defined(UA_ARCHITECTURE_WIN32) |
13 | | |
14 | | #if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__) |
15 | | #include <time.h> |
16 | | #endif |
17 | | |
18 | | /*********/ |
19 | | /* Timer */ |
20 | | /*********/ |
21 | | |
22 | | static UA_DateTime |
23 | 999 | UA_EventLoopPOSIX_nextTimer(UA_EventLoop *public_el) { |
24 | 999 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
25 | 999 | if(el->delayedHead1 > (UA_DelayedCallback *)0x01 || |
26 | 999 | el->delayedHead2 > (UA_DelayedCallback *)0x01) |
27 | 103 | return el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); |
28 | 896 | return UA_Timer_next(&el->timer); |
29 | 999 | } |
30 | | |
31 | | static UA_StatusCode |
32 | | UA_EventLoopPOSIX_addTimer(UA_EventLoop *public_el, UA_Callback cb, |
33 | | void *application, void *data, UA_Double interval_ms, |
34 | | UA_DateTime *baseTime, UA_TimerPolicy timerPolicy, |
35 | 708 | UA_UInt64 *callbackId) { |
36 | 708 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
37 | 708 | return UA_Timer_add(&el->timer, cb, application, data, interval_ms, |
38 | 708 | public_el->dateTime_nowMonotonic(public_el), |
39 | 708 | baseTime, timerPolicy, callbackId); |
40 | 708 | } |
41 | | |
42 | | static UA_StatusCode |
43 | | UA_EventLoopPOSIX_modifyTimer(UA_EventLoop *public_el, |
44 | | UA_UInt64 callbackId, |
45 | | UA_Double interval_ms, |
46 | | UA_DateTime *baseTime, |
47 | 0 | UA_TimerPolicy timerPolicy) { |
48 | 0 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
49 | 0 | return UA_Timer_modify(&el->timer, callbackId, interval_ms, |
50 | 0 | public_el->dateTime_nowMonotonic(public_el), |
51 | 0 | baseTime, timerPolicy); |
52 | 0 | } |
53 | | |
54 | | static void |
55 | | UA_EventLoopPOSIX_removeTimer(UA_EventLoop *public_el, |
56 | 708 | UA_UInt64 callbackId) { |
57 | 708 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
58 | 708 | UA_Timer_remove(&el->timer, callbackId); |
59 | 708 | } |
60 | | |
61 | | void |
62 | | UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el, |
63 | 1.19k | UA_DelayedCallback *dc) { |
64 | 1.19k | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
65 | 1.19k | dc->next = NULL; |
66 | | |
67 | | /* el->delayedTail points either to prev->next or to the head. |
68 | | * We need to update two locations: |
69 | | * 1: el->delayedTail = &dc->next; |
70 | | * 2: *oldtail = dc; (equal to &dc->next) |
71 | | * |
72 | | * Once we have (1), we "own" the previous-to-last entry. No need to worry |
73 | | * about (2), we can adjust it with a delay. This makes the queue |
74 | | * "eventually consistent". */ |
75 | 1.19k | UA_DelayedCallback **oldtail = (UA_DelayedCallback**) |
76 | 1.19k | UA_atomic_xchg((void**)&el->delayedTail, &dc->next); |
77 | 1.19k | UA_atomic_xchg((void**)oldtail, &dc->next); |
78 | 1.19k | } |
79 | | |
80 | | /* Resets the delayed queue and returns the previous head and tail */ |
81 | | static void |
82 | | resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead, |
83 | 1.69k | UA_DelayedCallback **oldTail) { |
84 | 1.69k | if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 && |
85 | 1.69k | el->delayedHead2 <= (UA_DelayedCallback *)0x01) |
86 | 1.03k | return; /* The queue is empty */ |
87 | | |
88 | 658 | UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01); |
89 | 658 | UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2; |
90 | 658 | UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1; |
91 | | |
92 | | /* Switch active/inactive by resetting the sentinel values. The (old) active |
93 | | * head points to an element which we return. Parallel threads continue to |
94 | | * add elements to the queue "below" the first element. */ |
95 | 658 | UA_atomic_xchg((void**)inactiveHead, NULL); |
96 | 658 | *oldHead = (UA_DelayedCallback *) |
97 | 658 | UA_atomic_xchg((void**)activeHead, (void*)0x01); |
98 | | |
99 | | /* Make the tail point to the (new) active head. Return the value of last |
100 | | * tail. When iterating over the queue elements, we need to find this tail |
101 | | * as the last element. If we find a NULL next-pointer before hitting the |
102 | | * tail spinlock until the pointer updates (eventually consistent). */ |
103 | 658 | *oldTail = (UA_DelayedCallback*) |
104 | 658 | UA_atomic_xchg((void**)&el->delayedTail, inactiveHead); |
105 | 658 | } |
106 | | |
107 | | static void |
108 | | UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el, |
109 | 113 | UA_DelayedCallback *dc) { |
110 | 113 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; |
111 | 113 | UA_LOCK(&el->elMutex); |
112 | | |
113 | | /* Reset and get the old head and tail */ |
114 | 113 | UA_DelayedCallback *cur = NULL, *tail = NULL; |
115 | 113 | resetDelayedQueue(el, &cur, &tail); |
116 | | |
117 | | /* Loop until we reach the tail (or head and tail are both NULL) */ |
118 | 113 | UA_DelayedCallback *next; |
119 | 113 | for(; cur; cur = next) { |
120 | | /* Spin-loop until the next-pointer of cur is updated. |
121 | | * The element pointed to by tail must appear eventually. */ |
122 | 0 | next = cur->next; |
123 | 0 | while(!next && cur != tail) |
124 | 0 | next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next); |
125 | 0 | if(cur == dc) |
126 | 0 | continue; |
127 | 0 | UA_EventLoopPOSIX_addDelayedCallback(public_el, cur); |
128 | 0 | } |
129 | | |
130 | 113 | UA_UNLOCK(&el->elMutex); |
131 | 113 | } |
132 | | |
133 | | static void |
134 | 1.57k | processDelayed(UA_EventLoopPOSIX *el) { |
135 | 1.57k | UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
136 | 1.57k | "Process delayed callbacks"); |
137 | | |
138 | 1.57k | UA_LOCK_ASSERT(&el->elMutex); |
139 | | |
140 | | /* Reset and get the old head and tail */ |
141 | 1.57k | UA_DelayedCallback *dc = NULL, *tail = NULL; |
142 | 1.57k | resetDelayedQueue(el, &dc, &tail); |
143 | | |
144 | | /* Loop until we reach the tail (or head and tail are both NULL) */ |
145 | 1.57k | UA_DelayedCallback *next; |
146 | 2.77k | for(; dc; dc = next) { |
147 | 1.19k | next = dc->next; |
148 | 1.19k | while(!next && dc != tail) |
149 | 0 | next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next); |
150 | 1.19k | if(!dc->callback) |
151 | 0 | continue; |
152 | 1.19k | dc->callback(dc->application, dc->context); |
153 | 1.19k | } |
154 | 1.57k | } |
155 | | |
156 | | /***********************/ |
157 | | /* EventLoop Lifecycle */ |
158 | | /***********************/ |
159 | | |
160 | | static UA_StatusCode |
161 | 378 | UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) { |
162 | 378 | UA_LOCK(&el->elMutex); |
163 | | |
164 | 378 | if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH && |
165 | 378 | el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) { |
166 | 0 | UA_UNLOCK(&el->elMutex); |
167 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
168 | 0 | } |
169 | | |
170 | 378 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
171 | 378 | "Starting the EventLoop"); |
172 | | |
173 | | /* Setting the clock source */ |
174 | 378 | const UA_Int32 *cs = (const UA_Int32*) |
175 | 378 | UA_KeyValueMap_getScalar(&el->eventLoop.params, |
176 | 378 | UA_QUALIFIEDNAME(0, "clock-source"), |
177 | 378 | &UA_TYPES[UA_TYPES_INT32]); |
178 | 378 | const UA_Int32 *csm = (const UA_Int32*) |
179 | 378 | UA_KeyValueMap_getScalar(&el->eventLoop.params, |
180 | 378 | UA_QUALIFIEDNAME(0, "clock-source-monotonic"), |
181 | 378 | &UA_TYPES[UA_TYPES_INT32]); |
182 | 378 | #if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__) |
183 | 378 | el->clockSource = CLOCK_REALTIME; |
184 | 378 | if(cs) |
185 | 0 | el->clockSource = *cs; |
186 | | |
187 | 378 | # ifdef CLOCK_MONOTONIC_RAW |
188 | 378 | el->clockSourceMonotonic = CLOCK_MONOTONIC_RAW; |
189 | | # else |
190 | | el->clockSourceMonotonic = CLOCK_MONOTONIC; |
191 | | # endif |
192 | 378 | if(csm) |
193 | 0 | el->clockSourceMonotonic = *csm; |
194 | | #else |
195 | | if(cs || csm) { |
196 | | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
197 | | "Eventloop\t| Cannot set a custom clock source"); |
198 | | } |
199 | | #endif |
200 | | |
201 | | /* Create the self-pipe */ |
202 | 378 | int err = UA_EventLoopPOSIX_pipe(el->selfpipe); |
203 | 378 | if(err != 0) { |
204 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
205 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
206 | 0 | "Eventloop\t| Could not create the self-pipe (%s)", |
207 | 0 | errno_str)); |
208 | 0 | UA_UNLOCK(&el->elMutex); |
209 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
210 | 0 | } |
211 | | |
212 | | /* Create the epoll socket */ |
213 | 378 | #ifdef UA_HAVE_EPOLL |
214 | 378 | el->epollfd = epoll_create1(0); |
215 | 378 | if(el->epollfd == -1) { |
216 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
217 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
218 | 0 | "Eventloop\t| Could not create the epoll socket (%s)", |
219 | 0 | errno_str)); |
220 | 0 | UA_close(el->selfpipe[0]); |
221 | 0 | UA_close(el->selfpipe[1]); |
222 | 0 | UA_UNLOCK(&el->elMutex); |
223 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
224 | 0 | } |
225 | | |
226 | | /* epoll always listens on the self-pipe. This is the only epoll_event that |
227 | | * has a NULL data pointer. */ |
228 | 378 | struct epoll_event event; |
229 | 378 | memset(&event, 0, sizeof(struct epoll_event)); |
230 | 378 | event.events = EPOLLIN; |
231 | 378 | err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, el->selfpipe[0], &event); |
232 | 378 | if(err != 0) { |
233 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
234 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
235 | 0 | "Eventloop\t| Could not register the self-pipe for epoll (%s)", |
236 | 0 | errno_str)); |
237 | 0 | UA_close(el->selfpipe[0]); |
238 | 0 | UA_close(el->selfpipe[1]); |
239 | 0 | close(el->epollfd); |
240 | 0 | UA_UNLOCK(&el->elMutex); |
241 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
242 | 0 | } |
243 | 378 | #endif |
244 | | |
245 | | /* Start the EventSources */ |
246 | 378 | UA_StatusCode res = UA_STATUSCODE_GOOD; |
247 | 378 | UA_EventSource *es = el->eventLoop.eventSources; |
248 | 1.89k | while(es) { |
249 | 1.51k | res |= es->start(es); |
250 | 1.51k | es = es->next; |
251 | 1.51k | } |
252 | | |
253 | | /* Dirty-write the state that is const "from the outside" */ |
254 | 378 | *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = |
255 | 378 | UA_EVENTLOOPSTATE_STARTED; |
256 | | |
257 | 378 | UA_UNLOCK(&el->elMutex); |
258 | 378 | return res; |
259 | 378 | } |
260 | | |
261 | | static void |
262 | 579 | checkClosed(UA_EventLoopPOSIX *el) { |
263 | 579 | UA_LOCK_ASSERT(&el->elMutex); |
264 | | |
265 | 579 | UA_EventSource *es = el->eventLoop.eventSources; |
266 | 2.89k | while(es) { |
267 | 2.31k | if(es->state != UA_EVENTSOURCESTATE_STOPPED) |
268 | 0 | return; |
269 | 2.31k | es = es->next; |
270 | 2.31k | } |
271 | | |
272 | | /* Not closed until all delayed callbacks are processed */ |
273 | 579 | if(el->delayedHead1 != NULL && el->delayedHead2 != NULL) |
274 | 201 | return; |
275 | | |
276 | | /* Close the self-pipe when everything else is done */ |
277 | 378 | UA_close(el->selfpipe[0]); |
278 | 378 | UA_close(el->selfpipe[1]); |
279 | | |
280 | | /* Dirty-write the state that is const "from the outside" */ |
281 | 378 | *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = |
282 | 378 | UA_EVENTLOOPSTATE_STOPPED; |
283 | | |
284 | | /* Close the epoll/IOCP socket once all EventSources have shut down */ |
285 | 378 | #ifdef UA_HAVE_EPOLL |
286 | 378 | UA_close(el->epollfd); |
287 | 378 | #endif |
288 | | |
289 | 378 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
290 | 378 | "The EventLoop has stopped"); |
291 | 378 | } |
292 | | |
293 | | static void |
294 | 378 | UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) { |
295 | 378 | UA_LOCK(&el->elMutex); |
296 | | |
297 | 378 | if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) { |
298 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
299 | 0 | "The EventLoop is not running, cannot be stopped"); |
300 | 0 | UA_UNLOCK(&el->elMutex); |
301 | 0 | return; |
302 | 0 | } |
303 | | |
304 | 378 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
305 | 378 | "Stopping the EventLoop"); |
306 | | |
307 | | /* Set to STOPPING to prevent "normal use" */ |
308 | 378 | *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = |
309 | 378 | UA_EVENTLOOPSTATE_STOPPING; |
310 | | |
311 | | /* Stop all event sources (asynchronous) */ |
312 | 378 | UA_EventSource *es = el->eventLoop.eventSources; |
313 | 1.89k | for(; es; es = es->next) { |
314 | 1.51k | if(es->state == UA_EVENTSOURCESTATE_STARTING || |
315 | 1.51k | es->state == UA_EVENTSOURCESTATE_STARTED) { |
316 | 1.51k | es->stop(es); |
317 | 1.51k | } |
318 | 1.51k | } |
319 | | |
320 | | /* Set to STOPPED if all EventSources are STOPPED */ |
321 | 378 | checkClosed(el); |
322 | | |
323 | 378 | UA_UNLOCK(&el->elMutex); |
324 | 378 | } |
325 | | |
326 | | static UA_StatusCode |
327 | 1.20k | UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) { |
328 | 1.20k | UA_LOCK(&el->elMutex); |
329 | | |
330 | 1.20k | if(el->executing) { |
331 | 0 | UA_LOG_ERROR(el->eventLoop.logger, |
332 | 0 | UA_LOGCATEGORY_EVENTLOOP, |
333 | 0 | "Cannot run EventLoop from the run method itself"); |
334 | 0 | UA_UNLOCK(&el->elMutex); |
335 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
336 | 0 | } |
337 | | |
338 | 1.20k | el->executing = true; |
339 | | |
340 | 1.20k | if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH || |
341 | 1.20k | el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) { |
342 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
343 | 0 | "Cannot run a stopped EventLoop"); |
344 | 0 | el->executing = false; |
345 | 0 | UA_UNLOCK(&el->elMutex); |
346 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
347 | 0 | } |
348 | | |
349 | 1.20k | UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
350 | 1.20k | "Iterate the EventLoop"); |
351 | | |
352 | | /* Process cyclic callbacks */ |
353 | 1.20k | UA_DateTime dateBefore = |
354 | 1.20k | el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); |
355 | | |
356 | 1.20k | UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore); |
357 | | |
358 | | /* Process delayed callbacks here: |
359 | | * - Removes closed sockets already here instead of polling them again. |
360 | | * - The timeout for polling is selected to be ready in time for the next |
361 | | * cyclic callback. So we want to do little work between the timeout |
362 | | * running out and executing the due cyclic callbacks. */ |
363 | 1.20k | processDelayed(el); |
364 | | |
365 | | /* A delayed callback could create another delayed callback (or re-add |
366 | | * itself). In that case we don't want to wait (indefinitely) for an event |
367 | | * to happen. Process queued events but don't sleep. Then process the |
368 | | * delayed callbacks in the next iteration. */ |
369 | 1.20k | if(el->delayedHead1 != NULL && el->delayedHead2 != NULL) |
370 | 0 | timeout = 0; |
371 | | |
372 | | /* Compute the remaining time */ |
373 | 1.20k | UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC); |
374 | 1.20k | if(dateNext > maxDate) |
375 | 1.20k | dateNext = maxDate; |
376 | 1.20k | UA_DateTime listenTimeout = |
377 | 1.20k | dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); |
378 | 1.20k | if(listenTimeout < 0) |
379 | 563 | listenTimeout = 0; |
380 | | |
381 | | /* Listen on the active file-descriptors (sockets) from the |
382 | | * ConnectionManagers */ |
383 | 1.20k | UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout); |
384 | | |
385 | | /* Check if the last EventSource was successfully stopped */ |
386 | 1.20k | if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING) |
387 | 201 | checkClosed(el); |
388 | | |
389 | 1.20k | el->executing = false; |
390 | 1.20k | UA_UNLOCK(&el->elMutex); |
391 | 1.20k | return rv; |
392 | 1.20k | } |
393 | | |
394 | | /*****************************/ |
395 | | /* Registering Event Sources */ |
396 | | /*****************************/ |
397 | | |
398 | | static UA_StatusCode |
399 | | UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el, |
400 | 1.51k | UA_EventSource *es) { |
401 | 1.51k | UA_LOCK(&el->elMutex); |
402 | | |
403 | | /* Already registered? */ |
404 | 1.51k | if(es->state != UA_EVENTSOURCESTATE_FRESH) { |
405 | 0 | UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
406 | 0 | "Cannot register the EventSource \"%.*s\": " |
407 | 0 | "already registered", |
408 | 0 | (int)es->name.length, (char*)es->name.data); |
409 | 0 | UA_UNLOCK(&el->elMutex); |
410 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
411 | 0 | } |
412 | | |
413 | | /* Add to linked list */ |
414 | 1.51k | es->next = el->eventLoop.eventSources; |
415 | 1.51k | el->eventLoop.eventSources = es; |
416 | | |
417 | 1.51k | es->eventLoop = &el->eventLoop; |
418 | 1.51k | es->state = UA_EVENTSOURCESTATE_STOPPED; |
419 | | |
420 | | /* Start if the entire EventLoop is started */ |
421 | 1.51k | UA_StatusCode res = UA_STATUSCODE_GOOD; |
422 | 1.51k | if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED) |
423 | 0 | res = es->start(es); |
424 | | |
425 | 1.51k | UA_UNLOCK(&el->elMutex); |
426 | 1.51k | return res; |
427 | 1.51k | } |
428 | | |
429 | | static UA_StatusCode |
430 | | UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el, |
431 | 1.51k | UA_EventSource *es) { |
432 | 1.51k | UA_LOCK(&el->elMutex); |
433 | | |
434 | 1.51k | if(es->state != UA_EVENTSOURCESTATE_STOPPED) { |
435 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
436 | 0 | "Cannot deregister the EventSource %.*s: " |
437 | 0 | "Has to be stopped first", |
438 | 0 | (int)es->name.length, es->name.data); |
439 | 0 | UA_UNLOCK(&el->elMutex); |
440 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
441 | 0 | } |
442 | | |
443 | | /* Remove from the linked list */ |
444 | 1.51k | UA_EventSource **s = &el->eventLoop.eventSources; |
445 | 1.51k | while(*s) { |
446 | 1.51k | if(*s == es) { |
447 | 1.51k | *s = es->next; |
448 | 1.51k | break; |
449 | 1.51k | } |
450 | 0 | s = &(*s)->next; |
451 | 0 | } |
452 | | |
453 | | /* Set the state to non-registered */ |
454 | 1.51k | es->state = UA_EVENTSOURCESTATE_FRESH; |
455 | | |
456 | 1.51k | UA_UNLOCK(&el->elMutex); |
457 | 1.51k | return UA_STATUSCODE_GOOD; |
458 | 1.51k | } |
459 | | |
460 | | /***************/ |
461 | | /* Time Domain */ |
462 | | /***************/ |
463 | | |
464 | | static UA_DateTime |
465 | 438k | UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) { |
466 | 438k | #if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__) |
467 | 438k | UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el; |
468 | 438k | struct timespec ts; |
469 | 438k | int res = clock_gettime(pel->clockSource, &ts); |
470 | 438k | if(UA_UNLIKELY(res != 0)) |
471 | 0 | return 0; |
472 | 438k | return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH; |
473 | | #else |
474 | | return UA_DateTime_now(); |
475 | | #endif |
476 | 438k | } |
477 | | |
478 | | static UA_DateTime |
479 | 4.71k | UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) { |
480 | 4.71k | #if defined(UA_ARCHITECTURE_POSIX) && !defined(__APPLE__) && !defined(__MACH__) |
481 | 4.71k | UA_EventLoopPOSIX *pel = (UA_EventLoopPOSIX*)el; |
482 | 4.71k | struct timespec ts; |
483 | 4.71k | int res = clock_gettime(pel->clockSourceMonotonic, &ts); |
484 | 4.71k | if(UA_UNLIKELY(res != 0)) |
485 | 0 | return 0; |
486 | | /* Also add the unix epoch for the monotonic clock. So we get a "normal" |
487 | | * output when a "normal" source is configured. */ |
488 | 4.71k | return (ts.tv_sec * UA_DATETIME_SEC) + (ts.tv_nsec / 100) + UA_DATETIME_UNIX_EPOCH; |
489 | | #else |
490 | | return UA_DateTime_nowMonotonic(); |
491 | | #endif |
492 | 4.71k | } |
493 | | |
494 | | static UA_Int64 |
495 | 0 | UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) { |
496 | | /* TODO: Fix for custom clock sources */ |
497 | 0 | return UA_DateTime_localTimeUtcOffset(); |
498 | 0 | } |
499 | | |
500 | | /*************************/ |
501 | | /* Initialize and Delete */ |
502 | | /*************************/ |
503 | | |
504 | | static UA_StatusCode |
505 | 378 | UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) { |
506 | 378 | UA_LOCK(&el->elMutex); |
507 | | |
508 | | /* Check if the EventLoop can be deleted */ |
509 | 378 | if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED && |
510 | 378 | el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) { |
511 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
512 | 0 | "Cannot delete a running EventLoop"); |
513 | 0 | UA_UNLOCK(&el->elMutex); |
514 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
515 | 0 | } |
516 | | |
517 | | /* Deregister and delete all the EventSources */ |
518 | 1.89k | while(el->eventLoop.eventSources) { |
519 | 1.51k | UA_EventSource *es = el->eventLoop.eventSources; |
520 | 1.51k | UA_EventLoopPOSIX_deregisterEventSource(el, es); |
521 | 1.51k | es->free(es); |
522 | 1.51k | } |
523 | | |
524 | | /* Remove the repeated timed callbacks */ |
525 | 378 | UA_Timer_clear(&el->timer); |
526 | | |
527 | | /* Process remaining delayed callbacks */ |
528 | 378 | processDelayed(el); |
529 | | |
530 | | #ifdef UA_ARCHITECTURE_WIN32 |
531 | | /* Stop the Windows networking subsystem */ |
532 | | WSACleanup(); |
533 | | #endif |
534 | | |
535 | 378 | UA_KeyValueMap_clear(&el->eventLoop.params); |
536 | | |
537 | | /* Clean up */ |
538 | 378 | UA_UNLOCK(&el->elMutex); |
539 | 378 | UA_LOCK_DESTROY(&el->elMutex); |
540 | 378 | UA_free(el); |
541 | 378 | return UA_STATUSCODE_GOOD; |
542 | 378 | } |
543 | | |
544 | | static void |
545 | 847k | UA_EventLoopPOSIX_lock(UA_EventLoop *public_el) { |
546 | 847k | UA_LOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex); |
547 | 847k | } |
548 | | static void |
549 | 847k | UA_EventLoopPOSIX_unlock(UA_EventLoop *public_el) { |
550 | 847k | UA_UNLOCK(&((UA_EventLoopPOSIX*)public_el)->elMutex); |
551 | 847k | } |
552 | | |
553 | | UA_EventLoop * |
554 | 378 | UA_EventLoop_new_POSIX(const UA_Logger *logger) { |
555 | 378 | UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*) |
556 | 378 | UA_calloc(1, sizeof(UA_EventLoopPOSIX)); |
557 | 378 | if(!el) |
558 | 0 | return NULL; |
559 | | |
560 | 378 | UA_LOCK_INIT(&el->elMutex); |
561 | 378 | UA_Timer_init(&el->timer); |
562 | | |
563 | | /* Initialize the queue */ |
564 | 378 | el->delayedTail = &el->delayedHead1; |
565 | 378 | el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */ |
566 | | |
567 | | #ifdef UA_ARCHITECTURE_WIN32 |
568 | | /* Start the WSA networking subsystem on Windows */ |
569 | | WSADATA wsaData; |
570 | | WSAStartup(MAKEWORD(2, 2), &wsaData); |
571 | | #endif |
572 | | |
573 | | /* Set the public EventLoop content */ |
574 | 378 | el->eventLoop.logger = logger; |
575 | | |
576 | 378 | el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start; |
577 | 378 | el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop; |
578 | 378 | el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free; |
579 | 378 | el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run; |
580 | 378 | el->eventLoop.cancel = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_cancel; |
581 | | |
582 | 378 | el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now; |
583 | 378 | el->eventLoop.dateTime_nowMonotonic = |
584 | 378 | UA_EventLoopPOSIX_DateTime_nowMonotonic; |
585 | 378 | el->eventLoop.dateTime_localTimeUtcOffset = |
586 | 378 | UA_EventLoopPOSIX_DateTime_localTimeUtcOffset; |
587 | | |
588 | 378 | el->eventLoop.nextTimer = UA_EventLoopPOSIX_nextTimer; |
589 | 378 | el->eventLoop.addTimer = UA_EventLoopPOSIX_addTimer; |
590 | 378 | el->eventLoop.modifyTimer = UA_EventLoopPOSIX_modifyTimer; |
591 | 378 | el->eventLoop.removeTimer = UA_EventLoopPOSIX_removeTimer; |
592 | 378 | el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback; |
593 | 378 | el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback; |
594 | | |
595 | 378 | el->eventLoop.registerEventSource = |
596 | 378 | (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*)) |
597 | 378 | UA_EventLoopPOSIX_registerEventSource; |
598 | 378 | el->eventLoop.deregisterEventSource = |
599 | 378 | (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*)) |
600 | 378 | UA_EventLoopPOSIX_deregisterEventSource; |
601 | | |
602 | 378 | el->eventLoop.lock = UA_EventLoopPOSIX_lock; |
603 | 378 | el->eventLoop.unlock = UA_EventLoopPOSIX_unlock; |
604 | | |
605 | 378 | return &el->eventLoop; |
606 | 378 | } |
607 | | |
608 | | /***************************/ |
609 | | /* Network Buffer Handling */ |
610 | | /***************************/ |
611 | | |
612 | | UA_StatusCode |
613 | | UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm, |
614 | | uintptr_t connectionId, |
615 | | UA_ByteString *buf, |
616 | 104 | size_t bufSize) { |
617 | 104 | UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; |
618 | 104 | if(pcm->txBuffer.length == 0) |
619 | 104 | return UA_ByteString_allocBuffer(buf, bufSize); |
620 | 0 | if(pcm->txBuffer.length < bufSize) |
621 | 0 | return UA_STATUSCODE_BADOUTOFMEMORY; |
622 | 0 | *buf = pcm->txBuffer; |
623 | 0 | buf->length = bufSize; |
624 | 0 | return UA_STATUSCODE_GOOD; |
625 | 0 | } |
626 | | |
627 | | void |
628 | | UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm, |
629 | | uintptr_t connectionId, |
630 | 104 | UA_ByteString *buf) { |
631 | 104 | UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; |
632 | 104 | if(pcm->txBuffer.data == buf->data) |
633 | 0 | UA_ByteString_init(buf); |
634 | 104 | else |
635 | 104 | UA_ByteString_clear(buf); |
636 | 104 | } |
637 | | |
638 | | UA_StatusCode |
639 | 1.13k | UA_EventLoopPOSIX_allocateStaticBuffers(UA_POSIXConnectionManager *pcm) { |
640 | 1.13k | UA_StatusCode res = UA_STATUSCODE_GOOD; |
641 | 1.13k | UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */ |
642 | 1.13k | const UA_UInt32 *configRxBufSize = (const UA_UInt32 *) |
643 | 1.13k | UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params, |
644 | 1.13k | UA_QUALIFIEDNAME(0, "recv-bufsize"), |
645 | 1.13k | &UA_TYPES[UA_TYPES_UINT32]); |
646 | 1.13k | if(configRxBufSize) |
647 | 0 | rxBufSize = *configRxBufSize; |
648 | 1.13k | if(pcm->rxBuffer.length != rxBufSize) { |
649 | 1.13k | UA_ByteString_clear(&pcm->rxBuffer); |
650 | 1.13k | res = UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize); |
651 | 1.13k | } |
652 | | |
653 | 1.13k | const UA_UInt32 *txBufSize = (const UA_UInt32 *) |
654 | 1.13k | UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params, |
655 | 1.13k | UA_QUALIFIEDNAME(0, "send-bufsize"), |
656 | 1.13k | &UA_TYPES[UA_TYPES_UINT32]); |
657 | 1.13k | if(txBufSize && pcm->txBuffer.length != *txBufSize) { |
658 | 0 | UA_ByteString_clear(&pcm->txBuffer); |
659 | 0 | res |= UA_ByteString_allocBuffer(&pcm->txBuffer, *txBufSize); |
660 | 0 | } |
661 | 1.13k | return res; |
662 | 1.13k | } |
663 | | |
664 | | /******************/ |
665 | | /* Socket Options */ |
666 | | /******************/ |
667 | | |
668 | | enum ZIP_CMP |
669 | 2.00k | cmpFD(const UA_FD *a, const UA_FD *b) { |
670 | 2.00k | if(*a == *b) |
671 | 821 | return ZIP_CMP_EQ; |
672 | 1.17k | return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE; |
673 | 2.00k | } |
674 | | |
675 | | UA_StatusCode |
676 | 708 | UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) { |
677 | 708 | #ifndef UA_ARCHITECTURE_WIN32 |
678 | 708 | int opts = fcntl(sockfd, F_GETFL); |
679 | 708 | if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0) |
680 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
681 | | #else |
682 | | u_long iMode = 1; |
683 | | if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR) |
684 | | return UA_STATUSCODE_BADINTERNALERROR; |
685 | | #endif |
686 | 708 | return UA_STATUSCODE_GOOD; |
687 | 708 | } |
688 | | |
689 | | UA_StatusCode |
690 | 826 | UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) { |
691 | | #ifdef SO_NOSIGPIPE |
692 | | int val = 1; |
693 | | int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)); |
694 | | if(res < 0) |
695 | | return UA_STATUSCODE_BADINTERNALERROR; |
696 | | #endif |
697 | 826 | return UA_STATUSCODE_GOOD; |
698 | 826 | } |
699 | | |
700 | | UA_StatusCode |
701 | 821 | UA_EventLoopPOSIX_setReusable(UA_FD sockfd) { |
702 | 821 | int enableReuseVal = 1; |
703 | 821 | #ifndef UA_ARCHITECTURE_WIN32 |
704 | 821 | int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, |
705 | 821 | (const char*)&enableReuseVal, sizeof(enableReuseVal)); |
706 | 821 | res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, |
707 | 821 | (const char*)&enableReuseVal, sizeof(enableReuseVal)); |
708 | 821 | return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR; |
709 | | #else |
710 | | int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, |
711 | | (const char*)&enableReuseVal, sizeof(enableReuseVal)); |
712 | | return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR; |
713 | | #endif |
714 | 821 | } |
715 | | |
716 | | /************************/ |
717 | | /* Select / epoll Logic */ |
718 | | /************************/ |
719 | | |
720 | | /* Re-arm the self-pipe socket for the next signal by reading from it */ |
721 | | static void |
722 | 0 | flushSelfPipe(UA_SOCKET s) { |
723 | 0 | char buf[128]; |
724 | | #ifdef UA_ARCHITECTURE_WIN32 |
725 | | recv(s, buf, 128, 0); |
726 | | #else |
727 | 0 | ssize_t i; |
728 | 0 | do { |
729 | 0 | i = read(s, buf, 128); |
730 | 0 | } while(i > 0); |
731 | 0 | #endif |
732 | 0 | } |
733 | | |
734 | | #if !defined(UA_HAVE_EPOLL) |
735 | | |
736 | | UA_StatusCode |
737 | | UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
738 | | UA_LOCK_ASSERT(&el->elMutex); |
739 | | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
740 | | "Registering fd: %u", (unsigned)rfd->fd); |
741 | | |
742 | | /* Realloc */ |
743 | | UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**) |
744 | | UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * (el->fdsSize + 1)); |
745 | | if(!fds_tmp) { |
746 | | return UA_STATUSCODE_BADOUTOFMEMORY; |
747 | | } |
748 | | el->fds = fds_tmp; |
749 | | |
750 | | /* Add to the last entry */ |
751 | | el->fds[el->fdsSize] = rfd; |
752 | | el->fdsSize++; |
753 | | return UA_STATUSCODE_GOOD; |
754 | | } |
755 | | |
756 | | UA_StatusCode |
757 | | UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
758 | | /* Do nothing, it is enough if the data was changed in the rfd */ |
759 | | UA_LOCK_ASSERT(&el->elMutex); |
760 | | return UA_STATUSCODE_GOOD; |
761 | | } |
762 | | |
763 | | void |
764 | | UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
765 | | UA_LOCK_ASSERT(&el->elMutex); |
766 | | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
767 | | "Unregistering fd: %u", (unsigned)rfd->fd); |
768 | | |
769 | | /* Find the entry */ |
770 | | size_t i = 0; |
771 | | for(; i < el->fdsSize; i++) { |
772 | | if(el->fds[i] == rfd) |
773 | | break; |
774 | | } |
775 | | |
776 | | /* Not found? */ |
777 | | if(i == el->fdsSize) |
778 | | return; |
779 | | |
780 | | if(el->fdsSize > 1) { |
781 | | /* Move the last entry in the ith slot and realloc. */ |
782 | | el->fdsSize--; |
783 | | el->fds[i] = el->fds[el->fdsSize]; |
784 | | UA_RegisteredFD **fds_tmp = (UA_RegisteredFD**) |
785 | | UA_realloc(el->fds, sizeof(UA_RegisteredFD*) * el->fdsSize); |
786 | | /* if realloc fails the fds are still in a correct state with |
787 | | * possibly lost memory, so failing silently here is ok */ |
788 | | if(fds_tmp) |
789 | | el->fds = fds_tmp; |
790 | | } else { |
791 | | /* Remove the last entry */ |
792 | | UA_free(el->fds); |
793 | | el->fds = NULL; |
794 | | el->fdsSize = 0; |
795 | | } |
796 | | } |
797 | | |
798 | | static UA_FD |
799 | | setFDSets(UA_EventLoopPOSIX *el, fd_set *readset, fd_set *writeset, fd_set *errset) { |
800 | | UA_LOCK_ASSERT(&el->elMutex); |
801 | | |
802 | | FD_ZERO(readset); |
803 | | FD_ZERO(writeset); |
804 | | FD_ZERO(errset); |
805 | | |
806 | | /* Always listen on the read-end of the pipe */ |
807 | | UA_FD highestfd = el->selfpipe[0]; |
808 | | FD_SET(el->selfpipe[0], readset); |
809 | | |
810 | | for(size_t i = 0; i < el->fdsSize; i++) { |
811 | | UA_FD currentFD = el->fds[i]->fd; |
812 | | |
813 | | /* Add to the fd_sets */ |
814 | | if(el->fds[i]->listenEvents & UA_FDEVENT_IN) |
815 | | FD_SET(currentFD, readset); |
816 | | if(el->fds[i]->listenEvents & UA_FDEVENT_OUT) |
817 | | FD_SET(currentFD, writeset); |
818 | | |
819 | | /* Always return errors */ |
820 | | FD_SET(currentFD, errset); |
821 | | |
822 | | /* Highest fd? */ |
823 | | if(currentFD > highestfd) |
824 | | highestfd = currentFD; |
825 | | } |
826 | | return highestfd; |
827 | | } |
828 | | |
829 | | UA_StatusCode |
830 | | UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) { |
831 | | UA_assert(listenTimeout >= 0); |
832 | | UA_LOCK_ASSERT(&el->elMutex); |
833 | | |
834 | | fd_set readset, writeset, errset; |
835 | | UA_FD highestfd = setFDSets(el, &readset, &writeset, &errset); |
836 | | |
837 | | /* Nothing to do? */ |
838 | | if(highestfd == UA_INVALID_FD) { |
839 | | UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
840 | | "No valid FDs for processing"); |
841 | | return UA_STATUSCODE_GOOD; |
842 | | } |
843 | | |
844 | | struct timeval tmptv = { |
845 | | #ifndef UA_ARCHITECTURE_WIN32 |
846 | | (time_t)(listenTimeout / UA_DATETIME_SEC), |
847 | | (suseconds_t)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC) |
848 | | #else |
849 | | (long)(listenTimeout / UA_DATETIME_SEC), |
850 | | (long)((listenTimeout % UA_DATETIME_SEC) / UA_DATETIME_USEC) |
851 | | #endif |
852 | | }; |
853 | | |
854 | | UA_UNLOCK(&el->elMutex); |
855 | | int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv); |
856 | | UA_LOCK(&el->elMutex); |
857 | | if(selectStatus < 0) { |
858 | | /* We will retry, only log the error */ |
859 | | UA_LOG_SOCKET_ERRNO_WRAP( |
860 | | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
861 | | "Error during select: %s", errno_str)); |
862 | | return UA_STATUSCODE_GOOD; |
863 | | } |
864 | | |
865 | | /* The self-pipe has received. Clear the buffer by reading. */ |
866 | | if(UA_UNLIKELY(FD_ISSET(el->selfpipe[0], &readset))) |
867 | | flushSelfPipe(el->selfpipe[0]); |
868 | | |
869 | | /* Loop over all registered FD to see if an event arrived. Yes, this is why |
870 | | * select is slow for many open sockets. */ |
871 | | for(size_t i = 0; i < el->fdsSize; i++) { |
872 | | UA_RegisteredFD *rfd = el->fds[i]; |
873 | | |
874 | | /* The rfd is already registered for removal. Don't process incoming |
875 | | * events any longer. */ |
876 | | if(rfd->dc.callback) |
877 | | continue; |
878 | | |
879 | | /* Event signaled for the fd? */ |
880 | | short event = 0; |
881 | | if(FD_ISSET(rfd->fd, &readset)) { |
882 | | event = UA_FDEVENT_IN; |
883 | | } else if(FD_ISSET(rfd->fd, &writeset)) { |
884 | | event = UA_FDEVENT_OUT; |
885 | | } else if(FD_ISSET(rfd->fd, &errset)) { |
886 | | event = UA_FDEVENT_ERR; |
887 | | } else { |
888 | | continue; |
889 | | } |
890 | | |
891 | | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
892 | | "Processing event %u on fd %u", (unsigned)event, |
893 | | (unsigned)rfd->fd); |
894 | | |
895 | | /* Call the EventSource callback */ |
896 | | rfd->eventSourceCB(rfd->es, rfd, event); |
897 | | |
898 | | /* The fd has removed itself */ |
899 | | if(i == el->fdsSize || rfd != el->fds[i]) |
900 | | i--; |
901 | | } |
902 | | return UA_STATUSCODE_GOOD; |
903 | | } |
904 | | |
905 | | #else /* defined(UA_HAVE_EPOLL) */ |
906 | | |
907 | | UA_StatusCode |
908 | 821 | UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
909 | 821 | struct epoll_event event; |
910 | 821 | memset(&event, 0, sizeof(struct epoll_event)); |
911 | 821 | event.data.ptr = rfd; |
912 | 821 | event.events = 0; |
913 | 821 | if(rfd->listenEvents & UA_FDEVENT_IN) |
914 | 644 | event.events |= EPOLLIN; |
915 | 821 | if(rfd->listenEvents & UA_FDEVENT_OUT) |
916 | 0 | event.events |= EPOLLOUT; |
917 | | |
918 | 821 | int err = epoll_ctl(el->epollfd, EPOLL_CTL_ADD, rfd->fd, &event); |
919 | 821 | if(err != 0) { |
920 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
921 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
922 | 0 | "TCP %u\t| Could not register for epoll (%s)", |
923 | 0 | rfd->fd, errno_str)); |
924 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
925 | 0 | } |
926 | 821 | return UA_STATUSCODE_GOOD; |
927 | 821 | } |
928 | | |
929 | | UA_StatusCode |
930 | 0 | UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
931 | 0 | struct epoll_event event; |
932 | 0 | memset(&event, 0, sizeof(struct epoll_event)); |
933 | 0 | event.data.ptr = rfd; |
934 | 0 | event.events = 0; |
935 | 0 | if(rfd->listenEvents & UA_FDEVENT_IN) |
936 | 0 | event.events |= EPOLLIN; |
937 | 0 | if(rfd->listenEvents & UA_FDEVENT_OUT) |
938 | 0 | event.events |= EPOLLOUT; |
939 | |
|
940 | 0 | int err = epoll_ctl(el->epollfd, EPOLL_CTL_MOD, rfd->fd, &event); |
941 | 0 | if(err != 0) { |
942 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
943 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
944 | 0 | "TCP %u\t| Could not modify for epoll (%s)", |
945 | 0 | rfd->fd, errno_str)); |
946 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
947 | 0 | } |
948 | 0 | return UA_STATUSCODE_GOOD; |
949 | 0 | } |
950 | | |
951 | | void |
952 | 821 | UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd) { |
953 | 821 | int res = epoll_ctl(el->epollfd, EPOLL_CTL_DEL, rfd->fd, NULL); |
954 | 821 | if(res != 0) { |
955 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
956 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
957 | 0 | "TCP %u\t| Could not deregister from epoll (%s)", |
958 | 0 | rfd->fd, errno_str)); |
959 | 0 | } |
960 | 821 | } |
961 | | |
962 | | UA_StatusCode |
963 | 1.20k | UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) { |
964 | 1.20k | UA_assert(listenTimeout >= 0); |
965 | | |
966 | | /* If there is a positive timeout, wait at least one millisecond, the |
967 | | * minimum for blocking epoll_wait. This prevents a busy-loop, as the |
968 | | * open62541 library allows even smaller timeouts, which can result in a |
969 | | * zero timeout due to rounding to an integer here. */ |
970 | 1.20k | int timeout = (int)(listenTimeout / UA_DATETIME_MSEC); |
971 | 1.20k | if(timeout == 0 && listenTimeout > 0) |
972 | 0 | timeout = 1; |
973 | | |
974 | | /* Poll the registered sockets */ |
975 | 1.20k | struct epoll_event epoll_events[64]; |
976 | 1.20k | UA_UNLOCK(&el->elMutex); |
977 | 1.20k | int events = epoll_wait(el->epollfd, epoll_events, 64, timeout); |
978 | 1.20k | UA_LOCK(&el->elMutex); |
979 | | |
980 | | /* TODO: Replace with pwait2 for higher-precision timeouts once this is |
981 | | * available in the standard library. |
982 | | * |
983 | | * struct timespec precisionTimeout = { |
984 | | * (long)(listenTimeout / UA_DATETIME_SEC), |
985 | | * (long)((listenTimeout % UA_DATETIME_SEC) * 100) |
986 | | * }; |
987 | | * int events = epoll_pwait2(epollfd, epoll_events, 64, |
988 | | * precisionTimeout, NULL); */ |
989 | | |
990 | | /* Handle error conditions */ |
991 | 1.20k | if(events == -1) { |
992 | 2 | if(errno == EINTR) { |
993 | | /* We will retry, only log the error */ |
994 | 2 | UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
995 | 2 | "Timeout during poll"); |
996 | 2 | return UA_STATUSCODE_GOOD; |
997 | 2 | } |
998 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
999 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, |
1000 | 0 | "TCP\t| Error %s, closing the server socket", |
1001 | 0 | errno_str)); |
1002 | 0 | return UA_STATUSCODE_BADINTERNALERROR; |
1003 | 2 | } |
1004 | | |
1005 | | /* Process all received events */ |
1006 | 1.42k | for(int i = 0; i < events; i++) { |
1007 | 224 | UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr; |
1008 | | |
1009 | | /* The self-pipe has received */ |
1010 | 224 | if(!rfd) { |
1011 | 0 | flushSelfPipe(el->selfpipe[0]); |
1012 | 0 | continue; |
1013 | 0 | } |
1014 | | |
1015 | | /* The rfd is already registered for removal. Don't process incoming |
1016 | | * events any longer. */ |
1017 | 224 | if(rfd->dc.callback) |
1018 | 0 | continue; |
1019 | | |
1020 | | /* Get the event */ |
1021 | 224 | short revent = 0; |
1022 | 224 | if((epoll_events[i].events & EPOLLIN) == EPOLLIN) { |
1023 | 224 | revent = UA_FDEVENT_IN; |
1024 | 224 | } else if((epoll_events[i].events & EPOLLOUT) == EPOLLOUT) { |
1025 | 0 | revent = UA_FDEVENT_OUT; |
1026 | 0 | } else { |
1027 | 0 | revent = UA_FDEVENT_ERR; |
1028 | 0 | } |
1029 | | |
1030 | | /* Call the EventSource callback */ |
1031 | 224 | rfd->eventSourceCB(rfd->es, rfd, revent); |
1032 | 224 | } |
1033 | 1.19k | return UA_STATUSCODE_GOOD; |
1034 | 1.20k | } |
1035 | | |
1036 | | #endif /* defined(UA_HAVE_EPOLL) */ |
1037 | | |
1038 | | #if defined(UA_ARCHITECTURE_WIN32) || defined(__APPLE__) |
1039 | | int UA_EventLoopPOSIX_pipe(SOCKET fds[2]) { |
1040 | | struct sockaddr_in inaddr; |
1041 | | memset(&inaddr, 0, sizeof(inaddr)); |
1042 | | inaddr.sin_family = AF_INET; |
1043 | | inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); |
1044 | | inaddr.sin_port = 0; |
1045 | | |
1046 | | SOCKET lst = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
1047 | | bind(lst, (struct sockaddr *)&inaddr, sizeof(inaddr)); |
1048 | | listen(lst, 1); |
1049 | | |
1050 | | struct sockaddr_storage addr; |
1051 | | memset(&addr, 0, sizeof(addr)); |
1052 | | int len = sizeof(addr); |
1053 | | getsockname(lst, (struct sockaddr*)&addr, &len); |
1054 | | |
1055 | | fds[0] = socket(AF_INET, SOCK_STREAM, 0); |
1056 | | int err = connect(fds[0], (struct sockaddr*)&addr, len); |
1057 | | fds[1] = accept(lst, 0, 0); |
1058 | | #ifdef UA_ARCHITECTURE_WIN32 |
1059 | | closesocket(lst); |
1060 | | #endif |
1061 | | #ifdef __APPLE__ |
1062 | | close(lst); |
1063 | | #endif |
1064 | | |
1065 | | UA_EventLoopPOSIX_setNoSigPipe(fds[0]); |
1066 | | UA_EventLoopPOSIX_setReusable(fds[0]); |
1067 | | UA_EventLoopPOSIX_setNonBlocking(fds[0]); |
1068 | | UA_EventLoopPOSIX_setNoSigPipe(fds[1]); |
1069 | | UA_EventLoopPOSIX_setReusable(fds[1]); |
1070 | | UA_EventLoopPOSIX_setNonBlocking(fds[1]); |
1071 | | return err; |
1072 | | } |
1073 | | #elif defined(__QNX__) |
1074 | | int UA_EventLoopPOSIX_pipe(int fds[2]) { |
1075 | | int err = pipe(fds); |
1076 | | if(err == -1) { |
1077 | | return err; |
1078 | | } |
1079 | | |
1080 | | err = fcntl(fds[0], F_SETFL, O_NONBLOCK); |
1081 | | if(err == -1) { |
1082 | | return err; |
1083 | | } |
1084 | | return err; |
1085 | | } |
1086 | | #endif |
1087 | | |
1088 | | void |
1089 | 0 | UA_EventLoopPOSIX_cancel(UA_EventLoopPOSIX *el) { |
1090 | | /* Nothing to do if the EventLoop is not executing */ |
1091 | 0 | if(!el->executing) |
1092 | 0 | return; |
1093 | | |
1094 | | /* Trigger the self-pipe */ |
1095 | | #ifdef UA_ARCHITECTURE_WIN32 |
1096 | | int err = send(el->selfpipe[1], ".", 1, 0); |
1097 | | #else |
1098 | 0 | ssize_t err = write(el->selfpipe[1], ".", 1); |
1099 | 0 | #endif |
1100 | 0 | if(err <= 0) { |
1101 | 0 | UA_LOG_SOCKET_ERRNO_WRAP( |
1102 | 0 | UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, |
1103 | 0 | "Eventloop\t| Error signaling self-pipe (%s)", errno_str)); |
1104 | 0 | } |
1105 | 0 | } |
1106 | | |
1107 | | #endif |