Coverage Report

Created: 2025-11-09 06:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541/src/server/ua_server_async.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2019 (c) Fraunhofer IOSB (Author: Klaus Schick)
6
 *    Copyright 2019, 2025 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
7
 */
8
9
#include "ua_server_internal.h"
10
11
/* Cancel the operation, but don't _clear it here */
12
static void
13
UA_AsyncOperation_cancel(UA_Server *server, UA_AsyncOperation *op,
14
0
                         UA_StatusCode status) {
15
0
    UA_ServerConfig *sc = &server->config;
16
0
    void *cancelPtr = NULL;
17
18
    /* Set the status and get the pointer that identifies the operation */
19
0
    switch(op->asyncOperationType) {
20
0
    case UA_ASYNCOPERATIONTYPE_READ_REQUEST:
21
0
        cancelPtr = op->output.read;
22
0
        op->output.read->hasStatus = true;
23
0
        op->output.read->status = status;
24
0
        break;
25
0
    case UA_ASYNCOPERATIONTYPE_READ_DIRECT:
26
0
        cancelPtr = &op->output.directRead;
27
0
        op->output.directRead.hasStatus = true;
28
0
        op->output.directRead.status = status;
29
0
        break;
30
0
    case UA_ASYNCOPERATIONTYPE_WRITE_REQUEST:
31
0
        cancelPtr = &op->context.writeValue.value;
32
0
        *op->output.write = status;
33
0
        break;
34
0
    case UA_ASYNCOPERATIONTYPE_WRITE_DIRECT:
35
0
        cancelPtr = &op->context.writeValue.value;
36
0
        op->output.directWrite = status;
37
0
        break;
38
0
    case UA_ASYNCOPERATIONTYPE_CALL_REQUEST:
39
        /* outputArguments is always an allocated pointer, also if the length is zero */
40
0
        cancelPtr = op->output.call->outputArguments;
41
0
        op->output.call->statusCode = status;
42
0
        break;
43
0
    case UA_ASYNCOPERATIONTYPE_CALL_DIRECT:
44
        /* outputArguments is always an allocated pointer, also if the length is zero */
45
0
        cancelPtr = op->output.directCall.outputArguments;
46
0
        op->output.directCall.statusCode = status;
47
0
        break;
48
0
    default: UA_assert(false); return;
49
0
    }
50
51
    /* Notify the application that it must no longer set the async result */
52
0
    if(sc->asyncOperationCancelCallback)
53
0
        sc->asyncOperationCancelCallback(server, cancelPtr);
54
0
}
55
56
static void
57
0
UA_AsyncOperation_delete(UA_AsyncOperation *op) {
58
0
    UA_assert(op->asyncOperationType >= UA_ASYNCOPERATIONTYPE_CALL_DIRECT);
59
0
    switch(op->asyncOperationType) {
60
0
    case UA_ASYNCOPERATIONTYPE_READ_DIRECT:
61
0
        UA_DataValue_clear(&op->output.directRead);
62
0
        break;
63
0
    case UA_ASYNCOPERATIONTYPE_WRITE_DIRECT:
64
0
        break;
65
0
    case UA_ASYNCOPERATIONTYPE_CALL_DIRECT:
66
0
        UA_CallMethodResult_clear(&op->output.directCall);
67
0
        break;
68
0
    default: UA_assert(false); break;
69
0
    }
70
0
    UA_free(op);
71
0
}
72
73
static void
74
0
UA_AsyncResponse_delete(UA_AsyncResponse *ar) {
75
0
    UA_NodeId_clear(&ar->sessionId);
76
77
    /* Clean up the results array last. Because the results array memory also
78
     * includes ar. */
79
0
    void *arr = NULL;
80
0
    size_t arrSize = 0;
81
0
    const UA_DataType *arrType;
82
0
    if(ar->responseType == &UA_TYPES[UA_TYPES_CALLRESPONSE]) {
83
0
        arr = ar->response.callResponse.results;
84
0
        arrSize = ar->response.callResponse.resultsSize;
85
0
        ar->response.callResponse.results = NULL;
86
0
        ar->response.callResponse.resultsSize = 0;
87
0
        arrType = &UA_TYPES[UA_TYPES_CALLMETHODRESULT];
88
0
    } else if(ar->responseType == &UA_TYPES[UA_TYPES_READRESPONSE]) {
89
0
        arr = ar->response.readResponse.results;
90
0
        arrSize = ar->response.readResponse.resultsSize;
91
0
        ar->response.readResponse.results = NULL;
92
0
        ar->response.readResponse.resultsSize = 0;
93
0
        arrType = &UA_TYPES[UA_TYPES_DATAVALUE];
94
0
    } else /* if(ar->responseType == &UA_TYPES[UA_TYPES_WRITERESPONSE]) */ {
95
0
        UA_assert(ar->responseType == &UA_TYPES[UA_TYPES_WRITERESPONSE]);
96
0
        arr = ar->response.writeResponse.results;
97
0
        arrSize = ar->response.writeResponse.resultsSize;
98
0
        ar->response.writeResponse.results = NULL;
99
0
        ar->response.writeResponse.resultsSize = 0;
100
0
        arrType = &UA_TYPES[UA_TYPES_STATUSCODE];
101
0
    }
102
0
    UA_clear(&ar->response.callResponse, ar->responseType);
103
0
    UA_Array_delete(arr, arrSize, arrType);
104
0
}
105
106
static void
107
notifyServiceEnd(UA_Server *server, UA_AsyncResponse *ar,
108
0
                 UA_Session *session, UA_SecureChannel *sc) {
109
    /* Nothing to do? */
110
0
    UA_ServerConfig *config = UA_Server_getConfig(server);
111
0
    if(!config->globalNotificationCallback && !config->serviceNotificationCallback)
112
0
        return;
113
114
    /* Collect the payload */
115
0
    UA_NodeId sessionId = (session) ? session->sessionId : UA_NODEID_NULL;
116
0
    UA_UInt32 secureChannelId = (sc) ? sc->securityToken.channelId : 0;
117
0
    UA_NodeId serviceTypeId;
118
0
    if(ar->responseType == &UA_TYPES[UA_TYPES_CALLRESPONSE]) {
119
0
        serviceTypeId = UA_TYPES[UA_TYPES_CALLREQUEST].typeId;
120
0
    } else if(ar->responseType == &UA_TYPES[UA_TYPES_READRESPONSE]) {
121
0
        serviceTypeId = UA_TYPES[UA_TYPES_READREQUEST].typeId;
122
0
    } else /* if(ar->responseType == &UA_TYPES[UA_TYPES_WRITERESPONSE]) */ {
123
0
        serviceTypeId = UA_TYPES[UA_TYPES_WRITEREQUEST].typeId;
124
0
    }
125
126
    /* Notify the application */
127
0
    static UA_THREAD_LOCAL UA_KeyValuePair notifyPayload[4] = {
128
0
        {{0, UA_STRING_STATIC("securechannel-id")}, {0}},
129
0
        {{0, UA_STRING_STATIC("session-id")}, {0}},
130
0
        {{0, UA_STRING_STATIC("request-id")}, {0}},
131
0
        {{0, UA_STRING_STATIC("service-type")}, {0}}
132
0
    };
133
0
    UA_KeyValueMap notifyPayloadMap = {4, notifyPayload};
134
0
    if(config->serviceNotificationCallback || config->globalNotificationCallback) {
135
0
        UA_Variant_setScalar(&notifyPayload[0].value, &secureChannelId,
136
0
                             &UA_TYPES[UA_TYPES_UINT32]);
137
0
        UA_Variant_setScalar(&notifyPayload[1].value, &sessionId,
138
0
                             &UA_TYPES[UA_TYPES_NODEID]);
139
0
        UA_Variant_setScalar(&notifyPayload[2].value, &ar->requestId,
140
0
                             &UA_TYPES[UA_TYPES_UINT32]);
141
0
        UA_Variant_setScalar(&notifyPayload[3].value, &serviceTypeId,
142
0
                             &UA_TYPES[UA_TYPES_NODEID]);
143
0
    }
144
145
0
    UA_ApplicationNotificationType nt = UA_APPLICATIONNOTIFICATIONTYPE_SERVICE_END;
146
0
    if(config->serviceNotificationCallback)
147
0
        config->serviceNotificationCallback(server, nt, notifyPayloadMap);
148
0
    if(config->globalNotificationCallback)
149
0
        config->globalNotificationCallback(server, nt, notifyPayloadMap);
150
0
}
151
152
static void
153
0
sendAsyncResponse(UA_Server *server, UA_AsyncResponse *ar) {
154
0
    UA_assert(ar->opCountdown == 0);
155
156
    /* Get the session */
157
0
    UA_Session *session = getSessionById(server, &ar->sessionId);
158
0
    UA_SecureChannel *channel = (session) ? session->channel : NULL;
159
160
    /* Notify that processing the service has ended */
161
0
    notifyServiceEnd(server, ar, session, channel);
162
163
    /* Check the session */
164
0
    if(!session) {
165
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
166
0
                       "Async Service: Session %N no longer exists", ar->sessionId);
167
0
        return;
168
0
    }
169
170
    /* Check the channel */
171
0
    if(!channel) {
172
0
        UA_LOG_WARNING_SESSION(server->config.logging, session,
173
0
                               "Async Service Response cannot be sent. "
174
0
                               "No SecureChannel for the session.");
175
0
        return;
176
0
    }
177
178
    /* Set the request handle */
179
0
    UA_ResponseHeader *responseHeader = (UA_ResponseHeader*)
180
0
        &ar->response.callResponse.responseHeader;
181
0
    responseHeader->requestHandle = ar->requestHandle;
182
183
    /* Send the Response */
184
0
    UA_StatusCode res = sendResponse(server, channel, ar->requestId,
185
0
                                     (UA_Response*)&ar->response, ar->responseType);
186
0
    if(res != UA_STATUSCODE_GOOD) {
187
0
        UA_LOG_WARNING_SESSION(server->config.logging, session,
188
0
                               "Async Response for Req# %" PRIu32 " failed "
189
0
                               "with StatusCode %s", ar->requestId,
190
0
                               UA_StatusCode_name(res));
191
0
    }
192
0
}
193
194
static void
195
0
directOpCallback(UA_Server *server, UA_AsyncOperation *op) {
196
0
    switch(op->asyncOperationType) {
197
0
    case UA_ASYNCOPERATIONTYPE_READ_DIRECT:
198
0
        op->handling.callback.method.read(server,
199
0
                                          op->handling.callback.context,
200
0
                                          &op->output.directRead);
201
0
        break;
202
0
    case UA_ASYNCOPERATIONTYPE_WRITE_DIRECT:
203
0
        op->handling.callback.method.write(server,
204
0
                                           op->handling.callback.context,
205
0
                                           op->output.directWrite);
206
0
        break;
207
0
    case UA_ASYNCOPERATIONTYPE_CALL_DIRECT:
208
0
        op->handling.callback.method.call(server,
209
0
                                          op->handling.callback.context,
210
0
                                          &op->output.directCall);
211
0
        break;
212
0
    default: UA_assert(false); break;
213
0
    }
214
0
}
215
216
/* Called from the EventLoop via a delayed callback */
217
static void
218
528
UA_AsyncManager_processReady(UA_Server *server, UA_AsyncManager *am) {
219
528
    UA_LOCK(&server->serviceMutex);
220
221
    /* Reset the delayed callback */
222
528
    UA_atomic_xchg((void**)&am->dc.callback, NULL);
223
224
    /* Process ready direct operations and free them */
225
528
    UA_AsyncOperation *op = NULL, *op_tmp = NULL;
226
528
    TAILQ_FOREACH_SAFE(op, &am->readyOps, pointers, op_tmp) {
227
0
        TAILQ_REMOVE(&am->readyOps, op, pointers);
228
0
        am->opsCount--;
229
0
        directOpCallback(server, op);
230
0
        UA_AsyncOperation_delete(op);
231
0
    }
232
233
    /* Send out ready responses */
234
528
    UA_AsyncResponse *ar, *temp;
235
528
    TAILQ_FOREACH_SAFE(ar, &am->readyResponses, pointers, temp) {
236
0
        TAILQ_REMOVE(&am->readyResponses, ar, pointers);
237
0
        sendAsyncResponse(server, ar);
238
0
        UA_AsyncResponse_delete(ar);
239
0
    }
240
241
528
    UA_UNLOCK(&server->serviceMutex);
242
528
}
243
244
static void
245
0
processOperationResult(UA_Server *server, UA_AsyncOperation *op) {
246
0
    UA_AsyncManager *am = &server->asyncManager;
247
0
    if(op->asyncOperationType >= UA_ASYNCOPERATIONTYPE_CALL_DIRECT) {
248
        /* Direct operation */
249
0
        TAILQ_REMOVE(&am->waitingOps, op, pointers);
250
0
        TAILQ_INSERT_TAIL(&am->readyOps, op, pointers);
251
0
    } else {
252
        /* Part of a service request */
253
0
        TAILQ_REMOVE(&am->waitingOps, op, pointers);
254
0
        am->opsCount--;
255
256
0
        UA_AsyncResponse *ar = op->handling.response;
257
0
        ar->opCountdown -= 1;
258
0
        if(ar->opCountdown > 0)
259
0
            return;
260
261
        /* Enqueue ar in the readyResponses */
262
0
        TAILQ_REMOVE(&am->waitingResponses, ar, pointers);
263
0
        TAILQ_INSERT_TAIL(&am->readyResponses, ar, pointers);
264
0
    }
265
266
    /* Trigger the main server thread to handle ready operations and responses */
267
0
    if(am->dc.callback == NULL) {
268
0
        UA_EventLoop *el = server->config.eventLoop;
269
0
        am->dc.callback = (UA_Callback)UA_AsyncManager_processReady;
270
0
        am->dc.application = server;
271
0
        am->dc.context = am;
272
0
        el->addDelayedCallback(el, &am->dc);
273
0
        el->cancel(el); /* Wake up the EventLoop if currently waiting in select() */
274
0
    }
275
0
}
276
277
/* Check if any operations have timed out */
278
static void
279
0
checkTimeouts(UA_Server *server, void *_) {
280
    /* Timeouts are not configured */
281
0
    if(server->config.asyncOperationTimeout <= 0.0)
282
0
        return;
283
284
0
    lockServer(server);
285
286
0
    UA_EventLoop *el = server->config.eventLoop;
287
0
    UA_AsyncManager *am = &server->asyncManager;
288
0
    const UA_DateTime tNow = el->dateTime_nowMonotonic(el);
289
290
    /* Loop over the waiting ops */
291
0
    UA_AsyncOperation *op = NULL, *op_tmp = NULL;
292
0
    TAILQ_FOREACH_SAFE(op, &am->waitingOps, pointers, op_tmp) {
293
        /* Check the timeout */
294
0
        if(op->asyncOperationType <= UA_ASYNCOPERATIONTYPE_WRITE_REQUEST) {
295
0
            if(tNow <= op->handling.response->timeout)
296
0
                continue;
297
0
        } else {
298
0
            if(tNow <= op->handling.callback.timeout)
299
0
                continue;
300
0
        }
301
302
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
303
0
                       "Operation was removed due to a timeout");
304
305
        /* Mark operation as timed out integrate */
306
0
        UA_AsyncOperation_cancel(server, op, UA_STATUSCODE_BADTIMEOUT);
307
0
        processOperationResult(server, op);
308
0
    }
309
310
0
    unlockServer(server);
311
0
}
312
313
void
314
528
UA_AsyncManager_init(UA_AsyncManager *am, UA_Server *server) {
315
528
    memset(am, 0, sizeof(UA_AsyncManager));
316
528
    TAILQ_INIT(&am->waitingResponses);
317
528
    TAILQ_INIT(&am->readyResponses);
318
528
    TAILQ_INIT(&am->waitingOps);
319
528
    TAILQ_INIT(&am->readyOps);
320
528
}
321
322
279
void UA_AsyncManager_start(UA_AsyncManager *am, UA_Server *server) {
323
    /* Add a regular callback for cleanup and sending finished responses at a
324
     * 1s interval. */
325
279
    addRepeatedCallback(server, (UA_ServerCallback)checkTimeouts,
326
279
                        NULL, 1000.0, &am->checkTimeoutCallbackId);
327
279
}
328
329
279
void UA_AsyncManager_stop(UA_AsyncManager *am, UA_Server *server) {
330
279
    removeCallback(server, am->checkTimeoutCallbackId);
331
279
    if(am->dc.callback) {
332
0
        UA_EventLoop *el = server->config.eventLoop;
333
0
        el->removeDelayedCallback(el, &am->dc);
334
0
    }
335
279
}
336
337
void
338
528
UA_AsyncManager_clear(UA_AsyncManager *am, UA_Server *server) {
339
528
    UA_LOCK_ASSERT(&server->serviceMutex);
340
341
    /* Cancel all operations. This moves all operations and responses into the
342
     * ready state. */
343
528
    UA_AsyncOperation *op, *op_tmp;
344
528
    TAILQ_FOREACH_SAFE(op, &am->waitingOps, pointers, op_tmp) {
345
0
        UA_AsyncOperation_cancel(server, op, UA_STATUSCODE_BADSHUTDOWN);
346
0
        processOperationResult(server, op);
347
0
    }
348
349
    /* This sends out/notifies and removes all direct operations and async requests */
350
528
    UA_AsyncManager_processReady(server, am);
351
528
    UA_assert(am->opsCount == 0);
352
528
}
353
354
UA_UInt32
355
0
UA_AsyncManager_cancel(UA_Server *server, UA_Session *session, UA_UInt32 requestHandle) {
356
0
    UA_LOCK_ASSERT(&server->serviceMutex);
357
358
    /* Loop over all waiting operations */
359
0
    UA_UInt32 count = 0;
360
0
    UA_AsyncOperation *op, *op_tmp;
361
0
    UA_AsyncManager *am = &server->asyncManager;
362
0
    TAILQ_FOREACH_SAFE(op, &am->waitingOps, pointers, op_tmp) {
363
0
        UA_AsyncResponse *ar = op->handling.response;
364
0
        if(ar->requestHandle != requestHandle ||
365
0
           !UA_NodeId_equal(&session->sessionId, &ar->sessionId))
366
0
            continue;
367
368
0
        count++; /* Found a matching request */
369
370
        /* Set the status of the overall response */
371
0
        ar->response.callResponse.responseHeader.serviceResult =
372
0
            UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT;
373
374
        /* Notify, set operation status and integrate */
375
0
        UA_AsyncOperation_cancel(server, op, UA_STATUSCODE_BADOPERATIONABANDONED);
376
0
        processOperationResult(server, op);
377
0
    }
378
379
0
    return count;
380
0
}
381
382
static void
383
persistAsyncResponse(UA_Server *server, UA_Session *session,
384
0
                     void *response, UA_AsyncResponse *ar) {
385
0
    UA_LOCK_ASSERT(&server->serviceMutex);
386
0
    UA_AsyncManager *am = &server->asyncManager;
387
388
    /* Pending results, attach the AsyncResponse to the AsyncManager. RequestId
389
     * and -Handle are set in the AsyncManager before processing the request. */
390
0
    ar->requestId = am->currentRequestId;
391
0
    ar->requestHandle = am->currentRequestHandle;
392
0
    ar->sessionId = session->sessionId;
393
0
    ar->timeout = UA_INT64_MAX;
394
395
0
    UA_EventLoop *el = server->config.eventLoop;
396
0
    if(server->config.asyncOperationTimeout > 0.0)
397
0
        ar->timeout = el->dateTime_nowMonotonic(el) + (UA_DateTime)
398
0
            (server->config.asyncOperationTimeout * (UA_DateTime)UA_DATETIME_MSEC);
399
400
    /* Move the response content to the AsyncResponse */
401
0
    memcpy(&ar->response, response, ar->responseType->memSize);
402
0
    UA_init(response, ar->responseType);
403
404
    /* Enqueue the ar */
405
0
    TAILQ_INSERT_TAIL(&am->waitingResponses, ar, pointers);
406
0
}
407
408
static void
409
persistAsyncResponseOperation(UA_Server *server, UA_AsyncOperation *op,
410
                              UA_AsyncOperationType opType, UA_AsyncResponse *ar,
411
0
                              void *outputPtr) {
412
    /* Set up the async operation */
413
0
    op->asyncOperationType = opType;
414
0
    op->handling.response = ar;
415
0
    op->output.read = (UA_DataValue*)outputPtr;
416
417
    /* Not enough resources to store the async operation */
418
0
    UA_AsyncManager *am = &server->asyncManager;
419
0
    if(server->config.maxAsyncOperationQueueSize != 0 &&
420
0
       am->opsCount >= server->config.maxAsyncOperationQueueSize) {
421
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
422
0
                       "Cannot create async operation: Queue exceeds limit (%d).",
423
0
                       (int unsigned)server->config.maxAsyncOperationQueueSize);
424
        /* No need to call processOperationResult or UA_AsyncOperation_delete
425
         * here. The response already has the status code integrated. */
426
0
        UA_AsyncOperation_cancel(server, op, UA_STATUSCODE_BADTOOMANYOPERATIONS);
427
0
        return;
428
0
    }
429
430
    /* Enqueue the asyncop in the async manager */
431
0
    TAILQ_INSERT_TAIL(&am->waitingOps, op, pointers);
432
0
    ar->opCountdown++;
433
0
    am->opsCount++;
434
0
}
435
436
static UA_StatusCode
437
persistAsyncDirectOperation(UA_Server *server, UA_AsyncOperation *op,
438
                            UA_AsyncOperationType opType, void *context,
439
0
                            uintptr_t callback, UA_DateTime timeout) {
440
    /* Set up the async operation */
441
0
    op->asyncOperationType = opType;
442
0
    op->handling.callback.timeout = timeout;
443
0
    op->handling.callback.context = context;
444
0
    op->handling.callback.method.read = (UA_ServerAsyncReadResultCallback)callback;
445
446
    /* Not enough resources to store the async operation */
447
0
    UA_AsyncManager *am = &server->asyncManager;
448
0
    if(server->config.maxAsyncOperationQueueSize != 0 &&
449
0
       am->opsCount >= server->config.maxAsyncOperationQueueSize) {
450
0
        UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
451
0
                       "Cannot create async operation: Queue exceeds limit (%d).",
452
0
                       (int unsigned)server->config.maxAsyncOperationQueueSize);
453
0
        UA_AsyncOperation_cancel(server, op, UA_STATUSCODE_BADTOOMANYOPERATIONS);
454
0
        UA_AsyncOperation_delete(op);
455
0
        return UA_STATUSCODE_BADTOOMANYOPERATIONS;
456
0
    }
457
458
    /* Enqueue the asyncop in the async manager */
459
0
    TAILQ_INSERT_TAIL(&am->waitingOps, op, pointers);
460
0
    am->opsCount++;
461
0
    return UA_STATUSCODE_GOOD;
462
0
}
463
464
void
465
async_cancel(UA_Server *server, void *context, UA_StatusCode status,
466
0
             UA_Boolean cancelSynchronous) {
467
0
    UA_AsyncManager *am = &server->asyncManager;
468
0
    UA_AsyncOperation *op = NULL, *op_tmp = NULL;
469
470
    /* Cancel operations that are still waiting for the result */
471
0
    TAILQ_FOREACH_SAFE(op, &am->waitingOps, pointers, op_tmp) {
472
0
        if(op->handling.callback.context != context)
473
0
            continue;
474
475
        /* Cancel the operation. This sets the StatusCode and calls the
476
         * asyncOperationCancelCallback. */
477
0
        UA_AsyncOperation_cancel(server, op, status);
478
479
        /* Call the result-callback of the local async operation.
480
         * Right away or in the next EventLoop iteration. */
481
0
        if(cancelSynchronous) {
482
0
            TAILQ_REMOVE(&am->waitingOps, op, pointers);
483
0
            am->opsCount--;
484
0
            directOpCallback(server, op);
485
0
            UA_AsyncOperation_delete(op);
486
0
        } else {
487
0
            processOperationResult(server, op);
488
0
        }
489
0
    }
490
491
    /* All "ready" operations get processed in the next EventLoop iteration anyway */
492
0
    if(!cancelSynchronous)
493
0
        return;
494
495
    /* Process matching ready operations synchronously and delete them */
496
0
    TAILQ_FOREACH_SAFE(op, &am->readyOps, pointers, op_tmp) {
497
0
        if(op->handling.callback.context != context)
498
0
            continue;
499
0
        TAILQ_REMOVE(&am->readyOps, op, pointers);
500
0
        am->opsCount--;
501
0
        directOpCallback(server, op);
502
0
        UA_AsyncOperation_delete(op);
503
0
    }
504
0
}
505
506
void
507
UA_Server_cancelAsync(UA_Server *server, void *context, UA_StatusCode status,
508
0
                      UA_Boolean synchronousResultCallback) {
509
0
    lockServer(server);
510
0
    async_cancel(server, context, status, synchronousResultCallback);
511
0
    unlockServer(server);
512
0
}
513
514
/********/
515
/* Read */
516
/********/
517
518
UA_Boolean
519
Service_Read(UA_Server *server, UA_Session *session, const UA_ReadRequest *request,
520
0
             UA_ReadResponse *response) {
521
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session, "Processing ReadRequest");
522
0
    UA_LOCK_ASSERT(&server->serviceMutex);
523
524
    /* Check if the timestampstoreturn is valid */
525
0
    if(request->timestampsToReturn > UA_TIMESTAMPSTORETURN_NEITHER) {
526
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTIMESTAMPSTORETURNINVALID;
527
0
        return true;
528
0
    }
529
530
    /* Check if maxAge is valid */
531
0
    if(request->maxAge < 0) {
532
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADMAXAGEINVALID;
533
0
        return true;
534
0
    }
535
536
    /* Check if there are too many operations */
537
0
    if(server->config.maxNodesPerRead != 0 &&
538
0
       request->nodesToReadSize > server->config.maxNodesPerRead) {
539
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYOPERATIONS;
540
0
        return true;
541
0
    }
542
543
    /* Check if there are no operations */
544
0
    if(request->nodesToReadSize == 0) {
545
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
546
0
        return true;
547
0
    }
548
549
    /* Allocate the response array. The AsyncResponse and AsyncOperations are
550
     * added to the back. So they get cleaned up automatically if none of the
551
     * calls are async. */
552
0
    size_t opsLen = sizeof(UA_DataValue) * request->nodesToReadSize;
553
0
    size_t len = opsLen + sizeof(UA_AsyncResponse) +
554
0
        (sizeof(UA_AsyncOperation) * request->nodesToReadSize);
555
0
    response->results = (UA_DataValue*)UA_calloc(1, len);
556
0
    if(!response->results) {
557
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
558
0
        return true;
559
0
    }
560
0
    response->resultsSize = request->nodesToReadSize;
561
562
    /* Execute the operations */
563
0
    UA_AsyncResponse *ar = (UA_AsyncResponse*)&response->results[response->resultsSize];
564
0
    UA_AsyncOperation *aopArray = (UA_AsyncOperation*)&ar[1];
565
0
    for(size_t i = 0; i < request->nodesToReadSize; i++) {
566
0
        UA_Boolean done = Operation_Read(server, session, request->timestampsToReturn,
567
0
                                         &request->nodesToRead[i], &response->results[i]);
568
0
        if(!done)
569
0
            persistAsyncResponseOperation(server, &aopArray[i],
570
0
                                          UA_ASYNCOPERATIONTYPE_READ_REQUEST,
571
0
                                          ar, &response->results[i]);
572
0
    }
573
574
    /* If async operations are pending, persist them and signal the service is
575
     * not done */
576
0
    if(ar->opCountdown > 0) {
577
0
        ar->responseType = &UA_TYPES[UA_TYPES_READRESPONSE];
578
0
        persistAsyncResponse(server, session, response, ar);
579
0
    }
580
0
    return (ar->opCountdown == 0);
581
0
}
582
583
UA_StatusCode
584
read_async(UA_Server *server, UA_Session *session, const UA_ReadValueId *operation,
585
           UA_TimestampsToReturn ttr, UA_ServerAsyncReadResultCallback callback,
586
0
           void *context, UA_UInt32 timeout) {
587
    /* Allocate the async operation. Do this first as we need the pointer to the
588
     * datavalue to be stable.*/
589
0
    UA_AsyncOperation *op = (UA_AsyncOperation*)UA_calloc(1, sizeof(UA_AsyncOperation));
590
0
    if(!op)
591
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
592
593
0
    UA_DateTime timeoutDate = UA_INT64_MAX;
594
0
    if(timeout > 0) {
595
0
        UA_EventLoop *el = server->config.eventLoop;
596
0
        const UA_DateTime tNow = el->dateTime_nowMonotonic(el);
597
0
        timeoutDate = tNow + (timeout * UA_DATETIME_MSEC);
598
0
    }
599
600
    /* Call the operation */
601
0
    UA_Boolean done = Operation_Read(server, session, ttr, operation, &op->output.directRead);
602
0
    if(!done)
603
0
        return persistAsyncDirectOperation(server, op, UA_ASYNCOPERATIONTYPE_READ_DIRECT,
604
0
                                           context, (uintptr_t)callback, timeoutDate);
605
606
0
    callback(server, context, &op->output.directRead);
607
0
    UA_DataValue_clear(&op->output.directRead);
608
0
    UA_free(op);
609
0
    return UA_STATUSCODE_GOOD;
610
0
}
611
612
UA_StatusCode
613
UA_Server_read_async(UA_Server *server, const UA_ReadValueId *operation,
614
                     UA_TimestampsToReturn ttr, UA_ServerAsyncReadResultCallback callback,
615
0
                     void *context, UA_UInt32 timeout) {
616
0
    lockServer(server);
617
0
    UA_StatusCode res = read_async(server, &server->adminSession, operation,
618
0
                                   ttr, callback, context, timeout);
619
0
    unlockServer(server);
620
0
    return res;
621
0
}
622
623
UA_StatusCode
624
0
UA_Server_setAsyncReadResult(UA_Server *server, UA_DataValue *result) {
625
0
    lockServer(server);
626
0
    UA_AsyncManager *am = &server->asyncManager;
627
0
    UA_AsyncOperation *op = NULL;
628
0
    TAILQ_FOREACH(op, &am->waitingOps, pointers) {
629
0
        if(op->output.read == result || &op->output.directRead == result) {
630
0
            processOperationResult(server, op);
631
0
            break;
632
0
        }
633
0
    }
634
0
    unlockServer(server);
635
0
    return (op) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADNOTFOUND;
636
0
}
637
638
/*********/
639
/* Write */
640
/*********/
641
642
UA_Boolean
643
Service_Write(UA_Server *server, UA_Session *session,
644
0
              const UA_WriteRequest *request, UA_WriteResponse *response) {
645
0
    UA_assert(session != NULL);
646
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
647
0
                         "Processing WriteRequest");
648
0
    UA_LOCK_ASSERT(&server->serviceMutex);
649
650
0
    if(server->config.maxNodesPerWrite != 0 &&
651
0
       request->nodesToWriteSize > server->config.maxNodesPerWrite) {
652
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYOPERATIONS;
653
0
        return true;
654
0
    }
655
656
0
    if(request->nodesToWriteSize == 0) {
657
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
658
0
        return true;
659
0
    }
660
661
    /* Allocate the response array. The AsyncResponse and AsyncOperations are
662
     * added to the back. So they get cleaned up automatically if none of the
663
     * calls are async. */
664
0
    size_t opsLen = sizeof(UA_StatusCode) * request->nodesToWriteSize;
665
0
    size_t len = opsLen + sizeof(UA_AsyncResponse) +
666
0
        (sizeof(UA_AsyncOperation) * request->nodesToWriteSize);
667
0
    response->results = (UA_StatusCode*)UA_calloc(1, len);
668
0
    if(!response->results) {
669
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
670
0
        return true;
671
0
    }
672
0
    response->resultsSize = request->nodesToWriteSize;
673
674
    /* Execute the operations */
675
0
    UA_AsyncResponse *ar = (UA_AsyncResponse*)&response->results[response->resultsSize];
676
0
    UA_AsyncOperation *aopArray = (UA_AsyncOperation*)&ar[1];
677
0
    for(size_t i = 0; i < request->nodesToWriteSize; i++) {
678
        /* Ensure a stable pointer for the writevalue. Doesn't get written to,
679
         * just used for the lookup of the async operation later on.
680
         * The original writeValue might be _clear'ed before the lookup. */
681
0
        UA_AsyncOperation *aop = &aopArray[i];
682
0
        aop->context.writeValue = request->nodesToWrite[i];
683
0
        UA_Boolean done = Operation_Write(server, session, &aop->context.writeValue,
684
0
                                          &response->results[i]);
685
0
        if(!done)
686
0
            persistAsyncResponseOperation(server, aop, UA_ASYNCOPERATIONTYPE_WRITE_REQUEST,
687
0
                                          ar, &response->results[i]);
688
0
    }
689
690
    /* If async operations are pending, persist them and signal the service is
691
     * not done */
692
0
    if(ar->opCountdown > 0) {
693
0
        ar->responseType = &UA_TYPES[UA_TYPES_WRITERESPONSE];
694
0
        persistAsyncResponse(server, session, response, ar);
695
0
    }
696
0
    return (ar->opCountdown == 0);
697
0
}
698
699
UA_StatusCode
700
write_async(UA_Server *server, UA_Session *session, const UA_WriteValue *operation,
701
            UA_ServerAsyncWriteResultCallback callback, void *context,
702
0
            UA_UInt32 timeout) {
703
    /* Allocate the async operation. Do this first as we need the pointer to the
704
     * datavalue to be stable.*/
705
0
    UA_AsyncOperation *op = (UA_AsyncOperation*)UA_calloc(1, sizeof(UA_AsyncOperation));
706
0
    if(!op)
707
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
708
709
0
    UA_DateTime timeoutDate = UA_INT64_MAX;
710
0
    if(timeout > 0) {
711
0
        UA_EventLoop *el = server->config.eventLoop;
712
0
        const UA_DateTime tNow = el->dateTime_nowMonotonic(el);
713
0
        timeoutDate = tNow + (timeout * UA_DATETIME_MSEC);
714
0
    }
715
716
    /* Call the operation */
717
0
    op->context.writeValue = *operation; /* Stable pointer */
718
0
    UA_Boolean done = Operation_Write(server, session, &op->context.writeValue,
719
0
                                      &op->output.directWrite);
720
0
    if(!done)
721
0
        return persistAsyncDirectOperation(server, op, UA_ASYNCOPERATIONTYPE_WRITE_DIRECT,
722
0
                                           context, (uintptr_t)callback, timeoutDate);
723
724
    /* Done, return right away */
725
0
    callback(server, context, op->output.directWrite);
726
0
    UA_free(op);
727
0
    return UA_STATUSCODE_GOOD;
728
0
}
729
730
UA_StatusCode
731
UA_Server_write_async(UA_Server *server, const UA_WriteValue *operation,
732
                      UA_ServerAsyncWriteResultCallback callback,
733
0
                      void *context, UA_UInt32 timeout) {
734
0
    lockServer(server);
735
0
    UA_StatusCode res = write_async(server, &server->adminSession, operation,
736
0
                                    callback, context, timeout);
737
0
    unlockServer(server);
738
0
    return res;
739
0
}
740
741
UA_StatusCode
742
UA_Server_setAsyncWriteResult(UA_Server *server,
743
                              const UA_DataValue *value,
744
0
                              UA_StatusCode result) {
745
0
    lockServer(server);
746
0
    UA_AsyncManager *am = &server->asyncManager;
747
0
    UA_AsyncOperation *op = NULL;
748
0
    TAILQ_FOREACH(op, &am->waitingOps, pointers) {
749
0
        if(&op->context.writeValue.value == value) {
750
0
            if(op->asyncOperationType == UA_ASYNCOPERATIONTYPE_WRITE_REQUEST)
751
0
                *op->output.write = result;
752
0
            else
753
0
                op->output.directWrite = result;
754
0
            processOperationResult(server, op);
755
0
            break;
756
0
        }
757
0
    }
758
0
    unlockServer(server);
759
0
    return (op) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADNOTFOUND;
760
0
}
761
762
/********/
763
/* Call */
764
/********/
765
766
#ifdef UA_ENABLE_METHODCALLS
767
UA_Boolean
768
Service_Call(UA_Server *server, UA_Session *session,
769
0
             const UA_CallRequest *request, UA_CallResponse *response) {
770
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session, "Processing CallRequest");
771
0
    UA_LOCK_ASSERT(&server->serviceMutex);
772
773
0
    if(server->config.maxNodesPerMethodCall != 0 &&
774
0
        request->methodsToCallSize > server->config.maxNodesPerMethodCall) {
775
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYOPERATIONS;
776
0
        return true;
777
0
    }
778
779
0
    if(request->methodsToCallSize == 0) {
780
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
781
0
        return true;
782
0
    }
783
784
    /* Allocate the response array. The AsyncResponse and AsyncOperations are
785
     * added to the back. So they get cleaned up automatically if none of the
786
     * calls are async. */
787
0
    size_t opsLen = sizeof(UA_CallMethodResult) * request->methodsToCallSize;
788
0
    size_t len = opsLen + sizeof(UA_AsyncResponse) +
789
0
        (sizeof(UA_AsyncOperation) * request->methodsToCallSize);
790
0
    response->results = (UA_CallMethodResult*)UA_calloc(1, len);
791
0
    if(!response->results) {
792
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
793
0
        return true;
794
0
    }
795
0
    response->resultsSize = request->methodsToCallSize;
796
797
    /* Execute the operations */
798
0
    UA_AsyncResponse *ar = (UA_AsyncResponse*)&response->results[response->resultsSize];
799
0
    UA_AsyncOperation *aopArray = (UA_AsyncOperation*)&ar[1];
800
0
    for(size_t i = 0; i < request->methodsToCallSize; i++) {
801
0
        UA_Boolean done = Operation_CallMethod(server, session, &request->methodsToCall[i],
802
0
                                               &response->results[i]);
803
0
        if(!done)
804
0
            persistAsyncResponseOperation(server, &aopArray[i],
805
0
                                          UA_ASYNCOPERATIONTYPE_CALL_REQUEST,
806
0
                                          ar, &response->results[i]);
807
0
    }
808
809
    /* If async operations are pending, persist them and signal the service is
810
     * not done */
811
0
    if(ar->opCountdown > 0) {
812
0
        ar->responseType = &UA_TYPES[UA_TYPES_CALLRESPONSE];
813
0
        persistAsyncResponse(server, session, response, ar);
814
0
    }
815
0
    return (ar->opCountdown == 0);
816
0
}
817
818
UA_StatusCode
819
call_async(UA_Server *server, UA_Session *session, const UA_CallMethodRequest *operation,
820
           UA_ServerAsyncMethodResultCallback callback, void *context,
821
0
           UA_UInt32 timeout) {
822
    /* Allocate the async operation. Do this first as we need the pointer to the
823
     * datavalue to be stable.*/
824
0
    UA_AsyncOperation *op = (UA_AsyncOperation*)UA_calloc(1, sizeof(UA_AsyncOperation));
825
0
    if(!op)
826
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
827
828
0
    UA_DateTime timeoutDate = UA_INT64_MAX;
829
0
    if(timeout > 0) {
830
0
        UA_EventLoop *el = server->config.eventLoop;
831
0
        const UA_DateTime tNow = el->dateTime_nowMonotonic(el);
832
0
        timeoutDate = tNow + (timeout * UA_DATETIME_MSEC);
833
0
    }
834
835
    /* Call the operation */
836
0
    UA_Boolean done = Operation_CallMethod(server, session, operation,
837
0
                                           &op->output.directCall);
838
0
    if(!done)
839
0
        return persistAsyncDirectOperation(server, op, UA_ASYNCOPERATIONTYPE_CALL_DIRECT,
840
0
                                           context, (uintptr_t)callback, timeoutDate);
841
842
    /* Done, return right away */
843
0
    callback(server, context, &op->output.directCall);
844
0
    UA_CallMethodResult_clear(&op->output.directCall);
845
0
    UA_free(op);
846
0
    return UA_STATUSCODE_GOOD;
847
0
}
848
849
UA_StatusCode
850
UA_Server_call_async(UA_Server *server, const UA_CallMethodRequest *operation,
851
                     UA_ServerAsyncMethodResultCallback callback,
852
0
                     void *context, UA_UInt32 timeout) {
853
0
    lockServer(server);
854
0
    UA_StatusCode res =
855
0
        call_async(server, &server->adminSession, operation, callback, context, timeout);
856
0
    unlockServer(server);
857
0
    return res;
858
0
}
859
860
UA_StatusCode
861
UA_Server_setAsyncCallMethodResult(UA_Server *server, UA_Variant *output,
862
0
                                   UA_StatusCode result) {
863
0
    lockServer(server);
864
0
    UA_AsyncManager *am = &server->asyncManager;
865
0
    UA_AsyncOperation *op = NULL;
866
0
    TAILQ_FOREACH(op, &am->waitingOps, pointers) {
867
0
        if(op->output.call->outputArguments == output) {
868
0
            op->output.call->statusCode = result;
869
0
            processOperationResult(server, op);
870
0
            break;
871
0
        }
872
0
        if(op->output.directCall.outputArguments == output) {
873
0
            op->output.directCall.statusCode = result;
874
0
            processOperationResult(server, op);
875
0
            break;
876
0
        }
877
0
    }
878
0
    unlockServer(server);
879
0
    return (op) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADNOTFOUND;
880
0
}
881
#endif