Coverage Report

Created: 2025-07-01 07:01

/src/open62541/src/server/ua_server_async.c
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
6
 *    Copyright 2019 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
7
 */
8
9
#include "ua_server_internal.h"
10
11
#if UA_MULTITHREADING >= 100
12
13
static void
14
0
UA_AsyncOperation_delete(UA_AsyncOperation *ar) {
15
0
    UA_CallMethodRequest_clear(&ar->request);
16
0
    UA_CallMethodResult_clear(&ar->response);
17
0
    UA_free(ar);
18
0
}
19
20
static void
21
sendAsyncResponse(UA_Server *server, UA_AsyncManager *am,
22
0
                  UA_AsyncResponse *ar) {
23
0
    UA_assert(ar->opCountdown == 0);
24
25
    /* Get the session */
26
0
    UA_Session *session = getSessionById(server, &ar->sessionId);
27
0
    if(!session) {
28
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
29
0
                       "Async Service: Session %N no longer exists", ar->sessionId);
30
0
        UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar);
31
0
        return;
32
0
    }
33
34
    /* Check the channel */
35
0
    UA_SecureChannel *channel = session->channel;
36
0
    if(!channel) {
37
0
        UA_LOG_WARNING_SESSION(server->config.logging, session,
38
0
                               "Async Service Response cannot be sent. "
39
0
                               "No SecureChannel for the session.");
40
0
        UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar);
41
0
        return;
42
0
    }
43
44
    /* Set the request handle */
45
0
    UA_ResponseHeader *responseHeader = (UA_ResponseHeader*)
46
0
        &ar->response.callResponse.responseHeader;
47
0
    responseHeader->requestHandle = ar->requestHandle;
48
49
    /* Send the Response */
50
0
    UA_StatusCode res = sendResponse(server, channel, ar->requestId,
51
0
                                     (UA_Response*)&ar->response,
52
0
                                     &UA_TYPES[UA_TYPES_CALLRESPONSE]);
53
0
    if(res != UA_STATUSCODE_GOOD) {
54
0
        UA_LOG_WARNING_SESSION(server->config.logging, session,
55
0
                               "Async Response for Req# %" PRIu32 " failed "
56
0
                               "with StatusCode %s", ar->requestId,
57
0
                               UA_StatusCode_name(res));
58
0
    }
59
0
    UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar);
60
0
}
61
62
static void
63
0
UA_AsyncManager_sendAsyncResponses(void *s, void *a) {
64
0
    UA_Server *server = (UA_Server*)s;
65
0
    UA_AsyncManager *am = (UA_AsyncManager*)a;
66
67
0
    UA_LOCK(&server->serviceMutex);
68
0
    UA_LOCK(&am->queueLock);
69
70
    /* Reset the delayed callback */
71
0
    UA_atomic_xchg((void**)&am->dc.callback, NULL);
72
73
    /* Send out ready responses */
74
0
    UA_AsyncResponse *ar, *temp;
75
0
    TAILQ_FOREACH_SAFE(ar, &am->asyncResponses, pointers, temp) {
76
0
        if(ar->opCountdown == 0)
77
0
            sendAsyncResponse(server, am, ar);
78
0
    }
79
80
0
    UA_UNLOCK(&am->queueLock);
81
0
    UA_UNLOCK(&server->serviceMutex);
82
0
}
83
84
static void
85
integrateResult(UA_Server *server, UA_AsyncManager *am,
86
0
                UA_AsyncOperation *ao) {
87
    /* Get the async response */
88
0
    UA_AsyncResponse *ar = ao->parent;
89
0
    ar->opCountdown -= 1;
90
91
    /* Move the UA_CallMethodResult to UA_CallResponse */
92
0
    ar->response.callResponse.results[ao->index] = ao->response;
93
0
    UA_CallMethodResult_init(&ao->response);
94
0
    UA_AsyncOperation_delete(ao);
95
96
    /* Reduce the number of overall outstanding async ops */
97
0
    am->opsCount--;
98
99
    /* Trigger the main server thread to send out responses */
100
0
    if(ar->opCountdown == 0 && am->dc.callback == NULL) {
101
0
        UA_EventLoop *el = server->config.eventLoop;
102
103
0
        am->dc.callback = UA_AsyncManager_sendAsyncResponses;
104
0
        am->dc.application = server;
105
0
        am->dc.context = am;
106
0
        el->addDelayedCallback(el, &am->dc);
107
108
        /* Wake up the EventLoop */
109
0
        el->cancel(el);
110
0
    }
111
0
}
112
113
/* Check if any operations have timed out */
114
static void
115
0
checkTimeouts(UA_Server *server, void *_) {
116
    /* Timeouts are not configured */
117
0
    if(server->config.asyncOperationTimeout <= 0.0)
118
0
        return;
119
120
0
    UA_EventLoop *el = server->config.eventLoop;
121
0
    UA_AsyncManager *am = &server->asyncManager;
122
0
    const UA_DateTime tNow = el->dateTime_nowMonotonic(el);
123
124
0
    UA_LOCK(&am->queueLock);
125
126
    /* Loop over the queue of dispatched ops */
127
0
    UA_AsyncOperation *op = NULL, *op_tmp = NULL;
128
0
    UA_AsyncOperationQueue *q = &am->dispatchedQueue;
129
130
0
 iterate:
131
0
    TAILQ_FOREACH_SAFE(op, q, pointers, op_tmp) {
132
        /* The timeout has not passed. Also for all elements following in the queue. */
133
0
        if(tNow <= op->parent->timeout)
134
0
            break;
135
136
0
        TAILQ_REMOVE(q, op, pointers);
137
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
138
0
                       "Operation was removed due to a timeout");
139
140
        /* Mark operation as timed out integrate */
141
0
        op->response.statusCode = UA_STATUSCODE_BADTIMEOUT;
142
0
        integrateResult(server, am, op);
143
0
    }
144
145
0
    if(q == &am->dispatchedQueue) {
146
0
        q = &am->newQueue;
147
0
        goto iterate;
148
0
    }
149
150
0
    UA_UNLOCK(&am->queueLock);
151
0
}
152
153
void
154
225
UA_AsyncManager_init(UA_AsyncManager *am, UA_Server *server) {
155
225
    memset(am, 0, sizeof(UA_AsyncManager));
156
225
    TAILQ_INIT(&am->asyncResponses);
157
225
    TAILQ_INIT(&am->newQueue);
158
225
    TAILQ_INIT(&am->dispatchedQueue);
159
225
    UA_LOCK_INIT(&am->queueLock);
160
225
}
161
162
0
void UA_AsyncManager_start(UA_AsyncManager *am, UA_Server *server) {
163
    /* Add a regular callback for cleanup and sending finished responses at a
164
     * 1s interval. */
165
0
    addRepeatedCallback(server, (UA_ServerCallback)checkTimeouts,
166
0
                        NULL, 1000.0, &am->checkTimeoutCallbackId);
167
0
}
168
169
0
void UA_AsyncManager_stop(UA_AsyncManager *am, UA_Server *server) {
170
    /* Add a regular callback for checking timeouts and sending finished
171
     * responses at a 100ms interval. */
172
0
    removeCallback(server, am->checkTimeoutCallbackId);
173
0
    if(am->dc.callback) {
174
0
        UA_EventLoop *el = server->config.eventLoop;
175
0
        el->removeDelayedCallback(el, &am->dc);
176
0
    }
177
0
}
178
179
void
180
225
UA_AsyncManager_clear(UA_AsyncManager *am, UA_Server *server) {
181
225
    UA_AsyncOperation *ar, *ar_tmp;
182
183
    /* Clean up queues */
184
225
    UA_LOCK(&am->queueLock);
185
225
    TAILQ_FOREACH_SAFE(ar, &am->newQueue, pointers, ar_tmp) {
186
0
        TAILQ_REMOVE(&am->newQueue, ar, pointers);
187
0
        UA_AsyncOperation_delete(ar);
188
0
    }
189
225
    TAILQ_FOREACH_SAFE(ar, &am->dispatchedQueue, pointers, ar_tmp) {
190
0
        TAILQ_REMOVE(&am->dispatchedQueue, ar, pointers);
191
0
        UA_AsyncOperation_delete(ar);
192
0
    }
193
225
    UA_UNLOCK(&am->queueLock);
194
195
    /* Remove responses */
196
225
    UA_AsyncResponse *current, *temp;
197
225
    TAILQ_FOREACH_SAFE(current, &am->asyncResponses, pointers, temp) {
198
0
        UA_AsyncManager_removeAsyncResponse(am, current);
199
0
    }
200
201
    /* Delete all locks */
202
225
    UA_LOCK_DESTROY(&am->queueLock);
203
225
}
204
205
UA_StatusCode
206
UA_AsyncManager_createAsyncResponse(UA_AsyncManager *am, UA_Server *server,
207
                                    const UA_NodeId *sessionId,
208
                                    const UA_UInt32 requestId, const UA_UInt32 requestHandle,
209
                                    const UA_AsyncOperationType operationType,
210
0
                                    UA_AsyncResponse **outAr) {
211
0
    UA_AsyncResponse *newentry = (UA_AsyncResponse*)UA_calloc(1, sizeof(UA_AsyncResponse));
212
0
    if(!newentry)
213
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
214
215
0
    UA_StatusCode res = UA_NodeId_copy(sessionId, &newentry->sessionId);
216
0
    if(res != UA_STATUSCODE_GOOD) {
217
0
        UA_free(newentry);
218
0
        return res;
219
0
    }
220
221
0
    UA_EventLoop *el = server->config.eventLoop;
222
223
0
    newentry->requestId = requestId;
224
0
    newentry->requestHandle = requestHandle;
225
0
    newentry->timeout = el->dateTime_nowMonotonic(el);
226
0
    if(server->config.asyncOperationTimeout > 0.0)
227
0
        newentry->timeout += (UA_DateTime)
228
0
            (server->config.asyncOperationTimeout * (UA_DateTime)UA_DATETIME_MSEC);
229
0
    TAILQ_INSERT_TAIL(&am->asyncResponses, newentry, pointers);
230
231
0
    *outAr = newentry;
232
0
    return UA_STATUSCODE_GOOD;
233
0
}
234
235
/* Remove entry and free all allocated data */
236
void
237
0
UA_AsyncManager_removeAsyncResponse(UA_AsyncManager *am, UA_AsyncResponse *ar) {
238
0
    TAILQ_REMOVE(&am->asyncResponses, ar, pointers);
239
0
    UA_CallResponse_clear(&ar->response.callResponse);
240
0
    UA_NodeId_clear(&ar->sessionId);
241
0
    UA_free(ar);
242
0
}
243
244
/* Enqueue next MethodRequest */
245
UA_StatusCode
246
UA_AsyncManager_createAsyncOp(UA_AsyncManager *am, UA_Server *server,
247
                              UA_AsyncResponse *ar, size_t opIndex,
248
0
                              const UA_CallMethodRequest *opRequest) {
249
0
    if(server->config.maxAsyncOperationQueueSize != 0 &&
250
0
       am->opsCount >= server->config.maxAsyncOperationQueueSize) {
251
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
252
0
                       "UA_Server_SetNextAsyncMethod: Queue exceeds limit (%d).",
253
0
                       (int unsigned)server->config.maxAsyncOperationQueueSize);
254
0
        return UA_STATUSCODE_BADUNEXPECTEDERROR;
255
0
    }
256
257
0
    UA_AsyncOperation *ao = (UA_AsyncOperation*)UA_calloc(1, sizeof(UA_AsyncOperation));
258
0
    if(!ao) {
259
0
        UA_LOG_ERROR(server->config.logging, UA_LOGCATEGORY_SERVER,
260
0
                     "UA_Server_SetNextAsyncMethod: Mem alloc failed.");
261
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
262
0
    }
263
264
0
    UA_StatusCode result = UA_CallMethodRequest_copy(opRequest, &ao->request);
265
0
    if(result != UA_STATUSCODE_GOOD) {
266
0
        UA_LOG_ERROR(server->config.logging, UA_LOGCATEGORY_SERVER,
267
0
                     "UA_Server_SetAsyncMethodResult: UA_CallMethodRequest_copy failed.");
268
0
        UA_free(ao);
269
0
        return result;
270
0
    }
271
272
0
    UA_CallMethodResult_init(&ao->response);
273
0
    ao->index = opIndex;
274
0
    ao->parent = ar;
275
276
0
    UA_LOCK(&am->queueLock);
277
0
    TAILQ_INSERT_TAIL(&am->newQueue, ao, pointers);
278
0
    am->opsCount++;
279
0
    ar->opCountdown++;
280
0
    UA_UNLOCK(&am->queueLock);
281
282
0
    if(server->config.asyncOperationNotifyCallback)
283
0
        server->config.asyncOperationNotifyCallback(server);
284
285
0
    return UA_STATUSCODE_GOOD;
286
0
}
287
288
/* Get and remove next Method Call Request */
289
UA_Boolean
290
UA_Server_getAsyncOperationNonBlocking(UA_Server *server, UA_AsyncOperationType *type,
291
                                       const UA_AsyncOperationRequest **request,
292
0
                                       void **context, UA_DateTime *timeout) {
293
0
    UA_AsyncManager *am = &server->asyncManager;
294
295
0
    UA_Boolean bRV = false;
296
0
    *type = UA_ASYNCOPERATIONTYPE_INVALID;
297
0
    UA_LOCK(&am->queueLock);
298
0
    UA_AsyncOperation *ao = TAILQ_FIRST(&am->newQueue);
299
0
    if(ao) {
300
0
        TAILQ_REMOVE(&am->newQueue, ao, pointers);
301
0
        TAILQ_INSERT_TAIL(&am->dispatchedQueue, ao, pointers);
302
0
        *type = UA_ASYNCOPERATIONTYPE_CALL;
303
0
        *request = (UA_AsyncOperationRequest*)&ao->request;
304
0
        *context = (void*)ao;
305
0
        if(timeout)
306
0
            *timeout = ao->parent->timeout;
307
0
        bRV = true;
308
0
    }
309
0
    UA_UNLOCK(&am->queueLock);
310
311
0
    return bRV;
312
0
}
313
314
/* Worker submits Method Call Response */
315
void
316
UA_Server_setAsyncOperationResult(UA_Server *server,
317
                                  const UA_AsyncOperationResponse *response,
318
0
                                  void *context) {
319
0
    UA_AsyncManager *am = &server->asyncManager;
320
321
0
    UA_AsyncOperation *ao = (UA_AsyncOperation*)context;
322
0
    if(!ao) {
323
        /* Something went wrong. Not a good AsyncOp. */
324
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
325
0
                       "UA_Server_SetAsyncMethodResult: Invalid context");
326
0
        return;
327
0
    }
328
329
0
    UA_LOCK(&am->queueLock);
330
331
    /* See if the operation is still in the dispatched queue. Otherwise it has
332
     * been removed due to a timeout.
333
     *
334
     * TODO: Add a tree-structure for the dispatch queue. The linear lookup does
335
     * not scale. */
336
0
    UA_AsyncOperation *op = NULL;
337
0
    TAILQ_FOREACH(op, &am->dispatchedQueue, pointers) {
338
0
        if(op == ao)
339
0
            break;
340
0
    }
341
0
    if(!op) {
342
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
343
0
                       "UA_Server_SetAsyncMethodResult: The operation has timed out");
344
0
        UA_UNLOCK(&am->queueLock);
345
0
        return;
346
0
    }
347
348
    /* Copy the result into the internal AsyncOperation */
349
0
    UA_StatusCode result =
350
0
        UA_CallMethodResult_copy(&response->callMethodResult, &ao->response);
351
0
    if(result != UA_STATUSCODE_GOOD) {
352
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
353
0
                       "UA_Server_SetAsyncMethodResult: UA_CallMethodResult_copy failed.");
354
0
        ao->response.statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
355
0
    }
356
357
    /* Remove from the dispatch queue */
358
0
    TAILQ_REMOVE(&am->dispatchedQueue, ao, pointers);
359
360
0
    UA_LOG_DEBUG(server->config.logging, UA_LOGCATEGORY_SERVER,
361
0
                 "Return result in the server thread with %" PRIu32 " ops remaining",
362
0
                 op->parent->opCountdown);
363
364
0
    integrateResult(server, am, op);
365
366
0
    UA_UNLOCK(&am->queueLock);
367
0
}
368
369
/******************/
370
/* Server Methods */
371
/******************/
372
373
UA_StatusCode
374
UA_Server_setMethodNodeAsync(UA_Server *server, const UA_NodeId id,
375
0
                             UA_Boolean isAsync) {
376
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
377
0
    lockServer(server);
378
0
    UA_Node *node =
379
0
        UA_NODESTORE_GET_EDIT_SELECTIVE(server, &id, UA_NODEATTRIBUTESMASK_NONE,
380
0
                                        UA_REFERENCETYPESET_NONE,
381
0
                                        UA_BROWSEDIRECTION_INVALID);
382
0
    if(node) {
383
0
        if(node->head.nodeClass == UA_NODECLASS_METHOD)
384
0
            node->methodNode.async = isAsync;
385
0
        else
386
0
            res = UA_STATUSCODE_BADNODECLASSINVALID;
387
0
        UA_NODESTORE_RELEASE(server, node);
388
0
    } else {
389
0
        res = UA_STATUSCODE_BADNODEIDINVALID;
390
0
    }
391
0
    unlockServer(server);
392
0
    return res;
393
0
}
394
395
UA_StatusCode
396
UA_Server_processServiceOperationsAsync(UA_Server *server, UA_Session *session,
397
                                        UA_UInt32 requestId, UA_UInt32 requestHandle,
398
                                        UA_AsyncServiceOperation operationCallback,
399
                                        const size_t *requestOperations,
400
                                        const UA_DataType *requestOperationsType,
401
                                        size_t *responseOperations,
402
                                        const UA_DataType *responseOperationsType,
403
0
                                        UA_AsyncResponse **ar) {
404
0
    size_t ops = *requestOperations;
405
0
    if(ops == 0)
406
0
        return UA_STATUSCODE_BADNOTHINGTODO;
407
408
    /* Allocate the response array. No padding after size_t */
409
0
    void **respPos = (void**)((uintptr_t)responseOperations + sizeof(size_t));
410
0
    *respPos = UA_Array_new(ops, responseOperationsType);
411
0
    if(!*respPos)
412
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
413
0
    *responseOperations = ops;
414
415
    /* Finish / dispatch the operations. This may allocate a new AsyncResponse internally */
416
0
    uintptr_t respOp = (uintptr_t)*respPos;
417
0
    uintptr_t reqOp = *(uintptr_t*)((uintptr_t)requestOperations + sizeof(size_t));
418
0
    for(size_t i = 0; i < ops; i++) {
419
0
        operationCallback(server, session, requestId, requestHandle,
420
0
                          i, (void*)reqOp, (void*)respOp, ar);
421
0
        reqOp += requestOperationsType->memSize;
422
0
        respOp += responseOperationsType->memSize;
423
0
    }
424
425
0
    return UA_STATUSCODE_GOOD;
426
0
}
427
428
UA_UInt32
429
0
UA_AsyncManager_cancel(UA_Server *server, UA_Session *session, UA_UInt32 requestHandle) {
430
0
    UA_LOCK_ASSERT(&server->serviceMutex);
431
0
    UA_AsyncManager *am = &server->asyncManager;
432
433
0
    UA_LOCK(&am->queueLock);
434
435
    /* Loop over the newQueue, then the dispatchedQueue */
436
0
    UA_UInt32 count = 0;
437
0
    UA_AsyncOperation *op = NULL, *op_tmp = NULL;
438
0
    UA_AsyncOperationQueue *q = &am->dispatchedQueue;
439
440
0
 iterate:
441
0
    TAILQ_FOREACH_SAFE(op, q, pointers, op_tmp) {
442
0
        UA_AsyncResponse *ar = op->parent;
443
0
        if(ar->requestHandle != requestHandle ||
444
0
           !UA_NodeId_equal(&session->sessionId, &ar->sessionId))
445
0
            continue;
446
447
        /* Found the matching request */
448
0
        TAILQ_REMOVE(q, op, pointers);
449
0
        count++;
450
451
        /* Set the status of the overall response */
452
0
        ar->response.callResponse.responseHeader.serviceResult =
453
0
            UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT;
454
455
        /* Set operation status and integrate */
456
0
        op->response.statusCode = UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT;
457
0
        integrateResult(server, am, op);
458
0
    }
459
460
0
    if(q == &am->dispatchedQueue) {
461
0
        q = &am->newQueue;
462
0
        goto iterate;
463
0
    }
464
465
0
    UA_UNLOCK(&am->queueLock);
466
0
    return count;
467
0
}
468
469
#endif