Coverage Report

Created: 2026-06-30 06:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541_15/src/client/ua_client_subscriptions.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2015-2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2015 (c) Oleksiy Vasylyev
7
 *    Copyright 2016 (c) Sten Grüner
8
 *    Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
9
 *    Copyright 2016-2017 (c) Florian Palm
10
 *    Copyright 2017 (c) Frank Meerkötter
11
 *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
12
 *    Copyright 2026 (c) o6 Automation GmbH (Author: Julius Pfrommer)
13
 */
14
15
#include <open62541/client_highlevel.h>
16
#include <open62541/client_highlevel_async.h>
17
18
#include "ua_client_internal.h"
19
20
struct UA_Client_MonitoredItem_ForDelete {
21
    UA_Client *client;
22
    UA_Client_Subscription *sub;
23
    UA_UInt32 *monitoredItemId;
24
};
25
26
/*****************/
27
/* Subscriptions */
28
/*****************/
29
30
/* For ZIP_TREE we use clientHandle comparison */
31
static enum ZIP_CMP
32
0
UA_ClientHandle_cmp(const void *a, const void *b) {
33
0
    const UA_Client_MonitoredItem *aa = (const UA_Client_MonitoredItem *)a;
34
0
    const UA_Client_MonitoredItem *bb = (const UA_Client_MonitoredItem *)b;
35
0
    if(aa->clientHandle < bb->clientHandle)
36
0
        return ZIP_CMP_LESS;
37
0
    if(aa->clientHandle > bb->clientHandle)
38
0
        return ZIP_CMP_MORE;
39
0
    return ZIP_CMP_EQ;
40
0
}
41
42
0
ZIP_FUNCTIONS(MonitorItemsTree, UA_Client_MonitoredItem, zipfields,
Unexecuted instantiation: ua_client_subscriptions.c:MonitorItemsTree_ZIP_INSERT
Unexecuted instantiation: ua_client_subscriptions.c:MonitorItemsTree_ZIP_REMOVE
Unexecuted instantiation: ua_client_subscriptions.c:MonitorItemsTree_ZIP_ITER
43
              UA_Client_MonitoredItem, zipfields, UA_ClientHandle_cmp)
44
45
static void
46
MonitoredItem_delete(UA_Client *client, UA_Client_Subscription *sub,
47
                     UA_Client_MonitoredItem *mon);
48
49
static void
50
Subscription_create(UA_Client *client, UA_Client_Subscription *newSub,
51
0
                    UA_CreateSubscriptionResponse *response) {
52
0
    UA_LOCK_ASSERT(&client->clientMutex);
53
54
0
    UA_EventLoop *el = client->config.eventLoop;
55
56
0
    newSub->subscriptionId = response->subscriptionId;
57
0
    newSub->sequenceNumber = 0;
58
0
    newSub->lastActivity = el->dateTime_nowMonotonic(el);
59
0
    newSub->publishingInterval = response->revisedPublishingInterval;
60
0
    newSub->maxKeepAliveCount = response->revisedMaxKeepAliveCount;
61
0
    ZIP_INIT(&newSub->monitoredItems);
62
0
    LIST_INSERT_HEAD(&client->subscriptions, newSub, listEntry);
63
64
    /* Immediately send the first publish requests if there are none
65
     * outstanding */
66
0
    __Client_Subscriptions_backgroundPublish(client);
67
0
}
68
69
static void
70
Subscriptions_create_handler(UA_Client *client, void *data,
71
0
                             UA_UInt32 requestId, void *r) {
72
0
    UA_LOCK_ASSERT(&client->clientMutex);
73
74
0
    UA_CreateSubscriptionResponse *response = (UA_CreateSubscriptionResponse *)r;
75
0
    CustomCallback *cc = (CustomCallback *)data;
76
0
    UA_Client_Subscription *newSub = (UA_Client_Subscription *)cc->clientData;
77
0
    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
78
0
        UA_free(newSub);
79
0
        goto cleanup;
80
0
    }
81
82
    /* Prepare the internal representation */
83
0
    Subscription_create(client, newSub, response);
84
85
0
cleanup:
86
0
    if(cc->userCallback)
87
0
        cc->userCallback(client, cc->userData, requestId, response);
88
0
    UA_free(cc);
89
0
}
90
91
UA_CreateSubscriptionResponse
92
UA_Client_Subscriptions_create(UA_Client *client,
93
                               const UA_CreateSubscriptionRequest request,
94
                               void *subscriptionContext,
95
                               UA_Client_StatusChangeNotificationCallback statusChangeCallback,
96
0
                               UA_Client_DeleteSubscriptionCallback deleteCallback) {
97
0
    lockClient(client);
98
99
0
    UA_CreateSubscriptionResponse response;
100
0
    UA_Client_Subscription *sub = (UA_Client_Subscription *)
101
0
        UA_malloc(sizeof(UA_Client_Subscription));
102
0
    if(!sub) {
103
0
        UA_CreateSubscriptionResponse_init(&response);
104
0
        response.responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
105
0
        unlockClient(client);
106
0
        return response;
107
0
    }
108
0
    sub->context = subscriptionContext;
109
0
    sub->statusChangeCallback = statusChangeCallback;
110
0
    sub->deleteCallback = deleteCallback;
111
112
    /* Send the request as a synchronous service call */
113
0
    __Client_Service(client, &request, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
114
0
                     &response, &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE]);
115
0
    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD) {
116
0
        UA_free(sub);
117
0
        unlockClient(client);
118
0
        return response;
119
0
    }
120
121
0
    Subscription_create(client, sub, &response);
122
123
0
    unlockClient(client);
124
0
    return response;
125
0
}
126
127
UA_StatusCode
128
UA_Client_Subscriptions_create_async(UA_Client *client, const UA_CreateSubscriptionRequest request,
129
                                     void *subscriptionContext,
130
                                     UA_Client_StatusChangeNotificationCallback statusChangeCallback,
131
                                     UA_Client_DeleteSubscriptionCallback deleteCallback,
132
                                     UA_ClientAsyncCreateSubscriptionCallback createCallback,
133
0
                                     void *userdata, UA_UInt32 *requestId) {
134
0
    CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
135
0
    if(!cc)
136
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
137
138
0
    UA_Client_Subscription *sub = (UA_Client_Subscription *)
139
0
        UA_malloc(sizeof(UA_Client_Subscription));
140
0
    if(!sub) {
141
0
        UA_free(cc);
142
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
143
0
    }
144
0
    sub->context = subscriptionContext;
145
0
    sub->statusChangeCallback = statusChangeCallback;
146
0
    sub->deleteCallback = deleteCallback;
147
148
0
    cc->userCallback = (UA_ClientAsyncServiceCallback)createCallback;
149
0
    cc->userData = userdata;
150
0
    cc->clientData = sub;
151
152
    /* Send the request as asynchronous service call */
153
0
    UA_StatusCode res =
154
0
        __UA_Client_AsyncService(client, &request,
155
0
                                 &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONREQUEST],
156
0
                                 Subscriptions_create_handler,
157
0
                                 &UA_TYPES[UA_TYPES_CREATESUBSCRIPTIONRESPONSE],
158
0
                                 cc, requestId);
159
0
    if(res != UA_STATUSCODE_GOOD) {
160
0
        UA_free(cc);
161
0
        UA_free(sub);
162
0
    }
163
0
    return res;
164
0
}
165
166
static UA_Client_Subscription *
167
0
findSubscriptionById(const UA_Client *client, UA_UInt32 subscriptionId) {
168
0
    UA_Client_Subscription *sub = NULL;
169
0
    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
170
0
        if(sub->subscriptionId == subscriptionId)
171
0
            break;
172
0
    }
173
0
    return sub;
174
0
}
175
176
static void
177
Subscription_modify(UA_Client *client, UA_Client_Subscription *sub,
178
0
                    const UA_ModifySubscriptionResponse *response) {
179
0
    sub->publishingInterval = response->revisedPublishingInterval;
180
0
    sub->maxKeepAliveCount = response->revisedMaxKeepAliveCount;
181
0
}
182
183
static void
184
Subscription_modify_handler(UA_Client *client, void *data,
185
0
                            UA_UInt32 requestId, void *r) {
186
0
    UA_LOCK_ASSERT(&client->clientMutex);
187
188
0
    UA_ModifySubscriptionResponse *response = (UA_ModifySubscriptionResponse *)r;
189
0
    CustomCallback *cc = (CustomCallback *)data;
190
0
    UA_Client_Subscription *sub =
191
0
        findSubscriptionById(client, (UA_UInt32)(uintptr_t)cc->clientData);
192
0
    if(sub) {
193
0
        Subscription_modify(client, sub, response);
194
0
    } else {
195
0
        UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
196
0
                    "No internal representation of subscription %" PRIu32,
197
0
                    (UA_UInt32)(uintptr_t)cc->clientData);
198
0
    }
199
200
0
    if(cc->userCallback)
201
0
        cc->userCallback(client, cc->userData, requestId, response);
202
0
    UA_free(cc);
203
0
}
204
205
UA_StatusCode
206
UA_Client_Subscriptions_getContext(UA_Client *client, UA_UInt32 subscriptionId,
207
0
                                   void **subContext) {
208
0
    if(!client || !subContext)
209
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
210
211
0
    lockClient(client);
212
0
    UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
213
0
    if(!sub) {
214
0
        unlockClient(client);
215
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
216
0
    }
217
218
0
    *subContext = sub->context;
219
0
    unlockClient(client);
220
0
    return UA_STATUSCODE_GOOD;
221
0
}
222
223
UA_StatusCode
224
UA_Client_Subscriptions_setContext(UA_Client *client, UA_UInt32 subscriptionId,
225
0
                                   void *subContext) {
226
0
    if(!client)
227
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
228
229
0
    lockClient(client);
230
0
    UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
231
0
    if(!sub) {
232
0
        unlockClient(client);
233
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
234
0
    }
235
236
0
    sub->context = subContext;
237
0
    unlockClient(client);
238
0
    return UA_STATUSCODE_GOOD;
239
0
}
240
241
UA_ModifySubscriptionResponse
242
UA_Client_Subscriptions_modify(UA_Client *client,
243
0
                               const UA_ModifySubscriptionRequest request) {
244
0
    UA_ModifySubscriptionResponse response;
245
0
    UA_ModifySubscriptionResponse_init(&response);
246
247
    /* Find the internal representation */
248
0
    lockClient(client);
249
0
    UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
250
0
    if(!sub) {
251
0
        unlockClient(client);
252
0
        response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
253
0
        return response;
254
0
    }
255
256
    /* Call the service */
257
0
    __Client_Service(client,
258
0
                     &request, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
259
0
                     &response, &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE]);
260
261
    /* Adjust the internal representation. Lookup again for thread-safety. */
262
0
    sub = findSubscriptionById(client, request.subscriptionId);
263
0
    if(!sub) {
264
0
        response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
265
0
        unlockClient(client);
266
0
        return response;
267
0
    }
268
0
    Subscription_modify(client, sub, &response);
269
0
    unlockClient(client);
270
0
    return response;
271
0
}
272
273
UA_StatusCode
274
UA_Client_Subscriptions_modify_async(UA_Client *client,
275
                                     const UA_ModifySubscriptionRequest request,
276
                                     UA_ClientAsyncModifySubscriptionCallback callback,
277
0
                                     void *userdata, UA_UInt32 *requestId) {
278
0
    lockClient(client);
279
280
0
    UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
281
0
    if(!sub) {
282
0
        unlockClient(client);
283
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
284
0
    }
285
286
0
    CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
287
0
    if(!cc) {
288
0
        unlockClient(client);
289
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
290
0
    }
291
292
0
    cc->clientData = (void *)(uintptr_t)request.subscriptionId;
293
0
    cc->userData = userdata;
294
0
    cc->userCallback = (UA_ClientAsyncServiceCallback)callback;
295
296
0
    UA_StatusCode res =
297
0
        __Client_AsyncService(client, &request,
298
0
                              &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONREQUEST],
299
0
                              Subscription_modify_handler,
300
0
                              &UA_TYPES[UA_TYPES_MODIFYSUBSCRIPTIONRESPONSE],
301
0
                              cc, requestId);
302
303
0
    unlockClient(client);
304
0
    return res;
305
0
}
306
307
static void *
308
0
MonitoredItem_delete_wrapper(void *data, UA_Client_MonitoredItem *mon) {
309
0
    struct UA_Client_MonitoredItem_ForDelete *deleteMonitoredItem =
310
0
        (struct UA_Client_MonitoredItem_ForDelete *)data;
311
0
    if(deleteMonitoredItem != NULL) {
312
0
        if(deleteMonitoredItem->monitoredItemId != NULL &&
313
0
           (mon->monitoredItemId != *deleteMonitoredItem->monitoredItemId)) {
314
0
            return NULL;
315
0
        }
316
0
        MonitoredItem_delete(deleteMonitoredItem->client, deleteMonitoredItem->sub, mon);
317
0
    }
318
0
    return NULL;
319
0
}
320
321
static void
322
__Client_Subscription_deleteInternal(UA_Client *client,
323
0
                                     UA_Client_Subscription *sub) {
324
    /* Remove the MonitoredItems */
325
0
    struct UA_Client_MonitoredItem_ForDelete deleteMonitoredItem;
326
0
    memset(&deleteMonitoredItem, 0, sizeof(struct UA_Client_MonitoredItem_ForDelete));
327
0
    deleteMonitoredItem.client = client;
328
0
    deleteMonitoredItem.sub = sub;
329
0
    ZIP_ITER(MonitorItemsTree, &sub->monitoredItems,
330
0
             MonitoredItem_delete_wrapper, &deleteMonitoredItem);
331
332
    /* Call the delete callback */
333
0
    if(sub->deleteCallback) {
334
0
        void *subC = sub->context;
335
0
        UA_UInt32 subId = sub->subscriptionId;
336
0
        sub->deleteCallback(client, subId, subC);
337
0
    }
338
339
    /* Remove */
340
0
    LIST_REMOVE(sub, listEntry);
341
0
    UA_free(sub);
342
0
}
343
344
static void
345
__Client_Subscription_processDelete(UA_Client *client,
346
                                    const UA_DeleteSubscriptionsRequest *request,
347
0
                                    const UA_DeleteSubscriptionsResponse *response)  {
348
0
    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
349
0
        return;
350
351
    /* Check that the request and response size -- use the same index for both */
352
0
    if(request->subscriptionIdsSize != response->resultsSize)
353
0
        return;
354
355
0
    for(size_t i = 0; i < request->subscriptionIdsSize; i++) {
356
0
        if(response->results[i] != UA_STATUSCODE_GOOD &&
357
0
           response->results[i] != UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID)
358
0
            continue;
359
360
        /* Get the Subscription */
361
0
        UA_Client_Subscription *sub =
362
0
            findSubscriptionById(client, request->subscriptionIds[i]);
363
0
        if(!sub) {
364
0
            UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
365
0
                        "No internal representation of subscription %" PRIu32,
366
0
                        request->subscriptionIds[i]);
367
0
            continue;
368
0
        }
369
370
        /* Delete the Subscription */
371
0
        __Client_Subscription_deleteInternal(client, sub);
372
0
    }
373
0
}
374
375
typedef struct {
376
    UA_DeleteSubscriptionsRequest request;
377
    UA_ClientAsyncServiceCallback userCallback;
378
    void *userData;
379
} DeleteSubscriptionCallback;
380
381
static void
382
Subscriptions_delete_handler(UA_Client *client, void *data,
383
0
                             UA_UInt32 requestId, void *r) {
384
0
    UA_DeleteSubscriptionsResponse *response =
385
0
        (UA_DeleteSubscriptionsResponse *)r;
386
0
    DeleteSubscriptionCallback *dsc =
387
0
        (DeleteSubscriptionCallback*)data;
388
389
0
    lockClient(client);
390
391
    /* Delete */
392
0
    __Client_Subscription_processDelete(client, &dsc->request, response);
393
394
    /* Userland Callback */
395
0
    if(dsc->userCallback)
396
0
        dsc->userCallback(client, dsc->userData, requestId, response);
397
398
    /* Cleanup */
399
0
    UA_DeleteSubscriptionsRequest_clear(&dsc->request);
400
0
    UA_free(dsc);
401
402
0
    unlockClient(client);
403
0
}
404
405
UA_StatusCode
406
UA_Client_Subscriptions_delete_async(UA_Client *client,
407
                                     const UA_DeleteSubscriptionsRequest request,
408
                                     UA_ClientAsyncDeleteSubscriptionsCallback callback,
409
0
                                     void *userdata, UA_UInt32 *requestId) {
410
    /* Make a copy of the request that persists into the async callback */
411
0
    DeleteSubscriptionCallback *dsc = (DeleteSubscriptionCallback*)
412
0
        UA_malloc(sizeof(DeleteSubscriptionCallback));
413
0
    if(!dsc)
414
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
415
0
    dsc->userCallback = (UA_ClientAsyncServiceCallback)callback;
416
0
    dsc->userData = userdata;
417
0
    UA_StatusCode res = UA_DeleteSubscriptionsRequest_copy(&request, &dsc->request);
418
0
    if(res != UA_STATUSCODE_GOOD) {
419
0
        UA_free(dsc);
420
0
        return res;
421
0
    }
422
423
    /* Make the async call */
424
0
    res = __UA_Client_AsyncService(client, &request,
425
0
                                   &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
426
0
                                   Subscriptions_delete_handler,
427
0
                                   &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE],
428
0
                                   dsc, requestId);
429
0
    if(res != UA_STATUSCODE_GOOD) {
430
0
        UA_DeleteSubscriptionsRequest_clear(&dsc->request);
431
0
        UA_free(dsc);
432
0
    }
433
0
    return res;
434
0
}
435
436
UA_DeleteSubscriptionsResponse
437
UA_Client_Subscriptions_delete(UA_Client *client,
438
0
                               const UA_DeleteSubscriptionsRequest request) {
439
0
    lockClient(client);
440
441
    /* Send the request */
442
0
    UA_DeleteSubscriptionsResponse response;
443
0
    __Client_Service(client, &request,
444
0
                     &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSREQUEST],
445
0
                     &response, &UA_TYPES[UA_TYPES_DELETESUBSCRIPTIONSRESPONSE]);
446
447
    /* Process */
448
0
    __Client_Subscription_processDelete(client, &request, &response);
449
450
0
    unlockClient(client);
451
0
    return response;
452
0
}
453
454
UA_StatusCode
455
0
UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId) {
456
0
    UA_DeleteSubscriptionsRequest request;
457
0
    UA_DeleteSubscriptionsRequest_init(&request);
458
0
    request.subscriptionIds = &subscriptionId;
459
0
    request.subscriptionIdsSize = 1;
460
461
0
    UA_DeleteSubscriptionsResponse response =
462
0
        UA_Client_Subscriptions_delete(client, request);
463
464
0
    UA_StatusCode retval = response.responseHeader.serviceResult;
465
0
    if(retval != UA_STATUSCODE_GOOD) {
466
0
        UA_DeleteSubscriptionsResponse_clear(&response);
467
0
        return retval;
468
0
    }
469
470
0
    if(response.resultsSize != 1) {
471
0
        UA_DeleteSubscriptionsResponse_clear(&response);
472
0
        return UA_STATUSCODE_BADINTERNALERROR;
473
0
    }
474
475
0
    retval = response.results[0];
476
0
    UA_DeleteSubscriptionsResponse_clear(&response);
477
0
    return retval;
478
0
}
479
480
/******************/
481
/* MonitoredItems */
482
/******************/
483
484
static void
485
MonitoredItem_delete(UA_Client *client, UA_Client_Subscription *sub,
486
0
                     UA_Client_MonitoredItem *mon) {
487
0
    UA_LOCK_ASSERT(&client->clientMutex);
488
489
0
    ZIP_REMOVE(MonitorItemsTree, &sub->monitoredItems, mon);
490
0
    if(mon->deleteCallback)
491
0
        mon->deleteCallback(client, sub->subscriptionId, sub->context,
492
0
                            mon->monitoredItemId, mon->context);
493
0
    for(size_t i = 0; i < mon->eventFields.mapSize; i++) {
494
0
        UA_Variant_init(&mon->eventFields.map[i].value);
495
0
    }
496
0
    UA_KeyValueMap_clear(&mon->eventFields);
497
0
    UA_free(mon);
498
0
}
499
500
static UA_StatusCode
501
prepareEventFieldsMap(UA_Client_MonitoredItem *newMon,
502
0
                      UA_MonitoringParameters *params) {
503
    /* Get the EventFilter */
504
0
    UA_ExtensionObject *eo = &params->filter;
505
0
    if(eo->content.decoded.type != &UA_TYPES[UA_TYPES_EVENTFILTER])
506
0
        return UA_STATUSCODE_GOOD;
507
0
    UA_EventFilter *ef = (UA_EventFilter*)eo->content.decoded.data;
508
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
509
510
    /* Check whether there are fields */
511
0
    if(ef->selectClausesSize == 0)
512
0
        return UA_STATUSCODE_GOOD;
513
514
    /* Allocate the map */
515
0
    newMon->eventFields.map = (UA_KeyValuePair*)
516
0
        UA_calloc(ef->selectClausesSize, sizeof(UA_KeyValuePair));
517
0
    if(!newMon->eventFields.map)
518
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
519
0
    newMon->eventFields.mapSize = ef->selectClausesSize;
520
521
    /* Create the key-strings for the fields */
522
0
    for(size_t i = 0; i < newMon->eventFields.mapSize; i++) {
523
0
        res |= UA_SimpleAttributeOperand_print(&ef->selectClauses[i],
524
0
                                               &newMon->eventFields.map[i].key.name);
525
0
    }
526
527
0
    return res;
528
0
}
529
530
static UA_StatusCode
531
MonitoredItem_createBegin(UA_Client *client, UA_Client_Subscription *sub,
532
                          UA_MonitoredItemCreateRequest *item,
533
                          UA_Client_DeleteMonitoredItemCallback deleteCallback,
534
                          void *context, void *handlingCallback,
535
0
                          UA_Client_MonitoredItem **outMon) {
536
    /* Allocate MonitoredItem */
537
0
    UA_Client_MonitoredItem *mon = (UA_Client_MonitoredItem *)
538
0
        UA_calloc(1, sizeof(UA_Client_MonitoredItem));
539
0
    if(!mon)
540
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
541
542
    /* Cache the field name map */
543
0
    mon->isEventMonitoredItem =
544
0
        (item->itemToMonitor.attributeId == UA_ATTRIBUTEID_EVENTNOTIFIER);
545
0
    if(mon->isEventMonitoredItem) {
546
0
        UA_StatusCode res = prepareEventFieldsMap(mon, &item->requestedParameters);
547
0
        if(res != UA_STATUSCODE_GOOD) {
548
0
            UA_free(mon);
549
0
            return res;
550
0
        }
551
0
    }
552
553
    /* Set a unique ClientHandle */
554
0
    item->requestedParameters.clientHandle = ++client->monitoredItemHandles;
555
556
    /* Fill in members and add to the client  */
557
0
    mon->clientHandle = item->requestedParameters.clientHandle;
558
0
    mon->context = context;
559
0
    mon->deleteCallback = deleteCallback;
560
0
    mon->handler.dataChangeCallback =
561
0
        (UA_Client_DataChangeNotificationCallback)(uintptr_t)handlingCallback;
562
0
    ZIP_INSERT(MonitorItemsTree, &sub->monitoredItems, mon);
563
564
0
    UA_LOG_DEBUG(client->config.logging, UA_LOGCATEGORY_CLIENT,
565
0
                 "Subscription %" PRIu32 " | Added a MonitoredItem with handle %" PRIu32,
566
0
                 sub->subscriptionId, mon->clientHandle);
567
568
0
    *outMon = mon;
569
0
    return UA_STATUSCODE_GOOD;
570
0
}
571
572
static void
573
MonitoredItem_createFinish(UA_Client *client, UA_Client_Subscription *sub,
574
                           UA_Client_MonitoredItem *mon,
575
0
                           UA_MonitoredItemCreateResult *result) {
576
0
    UA_assert(result->statusCode == UA_STATUSCODE_GOOD);
577
578
0
    mon->monitoredItemId = result->monitoredItemId;
579
    /* revisedSamplingInterval; */
580
    /* revisedQueueSize; */
581
    /* filterResult; */
582
0
}
583
584
/************************************/
585
/* CreateMonitoredItems Synchronous */
586
/************************************/
587
588
static void
589
Client_MonitoredItems_create(UA_Client *client,
590
                             const UA_CreateMonitoredItemsRequest *constRequest,
591
                             void **contexts, void **handlingCallbacks,
592
                             UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
593
0
                             UA_CreateMonitoredItemsResponse *response) {
594
0
    UA_LOCK_ASSERT(&client->clientMutex);
595
0
    UA_CreateMonitoredItemsResponse_init(response);
596
597
    /* Any items? */
598
0
    if(constRequest->itemsToCreateSize == 0) {
599
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOTHINGTODO;
600
0
        return;
601
0
    }
602
603
    /* Get the subscription */
604
0
    UA_Client_Subscription *sub =
605
0
        findSubscriptionById(client, constRequest->subscriptionId);
606
0
    if(!sub) {
607
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
608
0
        return;
609
0
    }
610
611
    /* Make a mutable copy. We modify the request to set the internal
612
     * clientHandle. */
613
0
    UA_CreateMonitoredItemsRequest request;
614
0
    UA_StatusCode res =
615
0
        UA_CreateMonitoredItemsRequest_copy(constRequest, &request);
616
0
    if(res != UA_STATUSCODE_GOOD) {
617
0
        response->responseHeader.serviceResult = res;
618
0
        return;
619
0
    }
620
621
    /* Create the MonitoredItems */
622
0
    UA_STACKARRAY(UA_Client_MonitoredItem*, mons, request.itemsToCreateSize);
623
0
    memset(mons, 0, sizeof(UA_Client_MonitoredItem*) * request.itemsToCreateSize);
624
0
    for(size_t i = 0; i < request.itemsToCreateSize; i++) {
625
0
        void *context = (contexts) ? contexts[i] : NULL;
626
0
        void *handlingCallback = (handlingCallbacks) ? handlingCallbacks[i] : NULL;
627
0
        UA_Client_DeleteMonitoredItemCallback deleteCallback =
628
0
            (deleteCallbacks) ? deleteCallbacks[i] : NULL;
629
0
        res |= MonitoredItem_createBegin(client, sub, &request.itemsToCreate[i],
630
0
                                         deleteCallback, context,
631
0
                                         handlingCallback, &mons[i]);
632
0
    }
633
634
    /* Failure -> Delete created MonitoredItems. Directly call deleteCallback if
635
     * creation failed. The MonitoredItemId is not yet known, use zero. */
636
0
    if(res != UA_STATUSCODE_GOOD) {
637
0
        for(size_t i = 0; i < request.itemsToCreateSize; i++) {
638
0
            if(mons[i])
639
0
                MonitoredItem_delete(client, sub, mons[i]);
640
0
            else if(deleteCallbacks && contexts)
641
0
                deleteCallbacks[i](client, request.subscriptionId,
642
0
                                   sub->context, 0, contexts[i]);
643
0
        }
644
0
        UA_CreateMonitoredItemsRequest_clear(&request);
645
0
        response->responseHeader.serviceResult = res;
646
0
        return;
647
0
    }
648
649
    /* Call the service */
650
0
    __Client_Service(client, &request,
651
0
                     &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
652
0
                     response, &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE]);
653
654
    /* Check that the response size is good */
655
0
    if(response->responseHeader.serviceResult == UA_STATUSCODE_GOOD &&
656
0
       response->resultsSize != request.itemsToCreateSize)
657
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
658
659
    /* Update the MonitoredItems */
660
0
    for(size_t i = 0; i < request.itemsToCreateSize; i++) {
661
0
        UA_assert(mons[i]);
662
0
        UA_MonitoredItemCreateResult *item = &response->results[i];
663
0
        if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD ||
664
0
           item->statusCode != UA_STATUSCODE_GOOD) {
665
0
            MonitoredItem_delete(client, sub, mons[i]);
666
0
            continue;
667
0
        }
668
0
        MonitoredItem_createFinish(client, sub, mons[i], item);
669
0
    }
670
671
0
    UA_CreateMonitoredItemsRequest_clear(&request);
672
0
}
673
674
UA_CreateMonitoredItemsResponse
675
UA_Client_MonitoredItems_createDataChanges(UA_Client *client,
676
                                           const UA_CreateMonitoredItemsRequest request,
677
                                           void **contexts,
678
                                           UA_Client_DataChangeNotificationCallback *callbacks,
679
0
                                           UA_Client_DeleteMonitoredItemCallback *deleteCallbacks) {
680
0
    UA_CreateMonitoredItemsResponse response;
681
0
    lockClient(client);
682
0
    Client_MonitoredItems_create(client, &request, contexts, (void **)callbacks,
683
0
                                 deleteCallbacks, &response);
684
0
    unlockClient(client);
685
0
    return response;
686
0
}
687
688
UA_MonitoredItemCreateResult
689
UA_Client_MonitoredItems_createDataChange(UA_Client *client, UA_UInt32 subscriptionId,
690
                                          UA_TimestampsToReturn timestampsToReturn,
691
                                          const UA_MonitoredItemCreateRequest item,
692
                                          void *context,
693
                                          UA_Client_DataChangeNotificationCallback callback,
694
0
                                          UA_Client_DeleteMonitoredItemCallback deleteCallback) {
695
0
    UA_CreateMonitoredItemsRequest request;
696
0
    UA_CreateMonitoredItemsRequest_init(&request);
697
0
    request.subscriptionId = subscriptionId;
698
0
    request.timestampsToReturn = timestampsToReturn;
699
0
    request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
700
0
    request.itemsToCreateSize = 1;
701
0
    UA_CreateMonitoredItemsResponse response =
702
0
       UA_Client_MonitoredItems_createDataChanges(client, request, &context,
703
0
                                                   &callback, &deleteCallback);
704
0
    UA_MonitoredItemCreateResult result;
705
0
    UA_MonitoredItemCreateResult_init(&result);
706
0
    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
707
0
        result.statusCode = response.responseHeader.serviceResult;
708
0
    if(result.statusCode == UA_STATUSCODE_GOOD &&
709
0
       response.resultsSize != 1)
710
0
        result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
711
0
    if(result.statusCode == UA_STATUSCODE_GOOD) {
712
0
        result = response.results[0];
713
0
        UA_MonitoredItemCreateResult_init(&response.results[0]);
714
0
    }
715
0
    UA_CreateMonitoredItemsResponse_clear(&response);
716
0
    return result;
717
0
}
718
719
UA_CreateMonitoredItemsResponse
720
UA_Client_MonitoredItems_createEvents(UA_Client *client,
721
                                      const UA_CreateMonitoredItemsRequest request,
722
                                      void **contexts,
723
                                      UA_Client_EventNotificationCallback *callback,
724
0
                                      UA_Client_DeleteMonitoredItemCallback *deleteCallback) {
725
0
    UA_CreateMonitoredItemsResponse response;
726
0
    lockClient(client);
727
0
    Client_MonitoredItems_create(client, &request, contexts, (void **)callback,
728
0
                                 deleteCallback, &response);
729
0
    unlockClient(client);
730
0
    return response;
731
0
}
732
733
UA_MonitoredItemCreateResult
734
UA_Client_MonitoredItems_createEvent(UA_Client *client, UA_UInt32 subscriptionId,
735
                                     UA_TimestampsToReturn timestampsToReturn,
736
                                     const UA_MonitoredItemCreateRequest item, void *context,
737
                                     UA_Client_EventNotificationCallback callback,
738
0
                                     UA_Client_DeleteMonitoredItemCallback deleteCallback) {
739
0
    UA_CreateMonitoredItemsRequest request;
740
0
    UA_CreateMonitoredItemsRequest_init(&request);
741
0
    request.subscriptionId = subscriptionId;
742
0
    request.timestampsToReturn = timestampsToReturn;
743
0
    request.itemsToCreate = (UA_MonitoredItemCreateRequest*)(uintptr_t)&item;
744
0
    request.itemsToCreateSize = 1;
745
0
    UA_CreateMonitoredItemsResponse response =
746
0
       UA_Client_MonitoredItems_createEvents(client, request, &context,
747
0
                                             &callback, &deleteCallback);
748
0
    UA_MonitoredItemCreateResult result;
749
0
    UA_MonitoredItemCreateResult_init(&result);
750
0
    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
751
0
        result.statusCode = response.responseHeader.serviceResult;
752
0
    if(result.statusCode == UA_STATUSCODE_GOOD &&
753
0
       response.resultsSize != 1)
754
0
        result.statusCode = UA_STATUSCODE_BADINTERNALERROR;
755
0
    if(result.statusCode == UA_STATUSCODE_GOOD) {
756
0
        result = response.results[0];
757
0
        UA_MonitoredItemCreateResult_init(&response.results[0]);
758
0
    }
759
0
    UA_CreateMonitoredItemsResponse_clear(&response);
760
0
    return result;
761
0
}
762
763
/*************************************/
764
/* CreateMonitoredItems Asynchronous */
765
/*************************************/
766
767
/* The handles are an array of [subId, monSize, monHandleId1, monHandle2, ...] */
768
static void
769
MonitoredItems_create_async_handler(UA_Client *client, void *data,
770
0
                                    UA_UInt32 requestId, void *resp) {
771
0
    CustomCallback *cc = (CustomCallback*)data;
772
0
    UA_UInt32 *handles = (UA_UInt32*)cc->clientData;
773
0
    UA_CreateMonitoredItemsResponse *response =
774
0
        (UA_CreateMonitoredItemsResponse *)resp;
775
776
0
    lockClient(client);
777
778
    /* Extract the first elements from the handles */
779
0
    UA_UInt32 subId = handles[0];
780
0
    UA_UInt32 monSize = handles[1];
781
0
    UA_UInt32 *monHandles = handles + 2;
782
783
    /* Check that the response size is good */
784
0
    if(response->responseHeader.serviceResult == UA_STATUSCODE_GOOD &&
785
0
       response->resultsSize != monSize)
786
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
787
788
    /* Get the Subscription from the SubscriptionId */
789
0
    UA_Client_Subscription *sub = findSubscriptionById(client, subId);
790
0
    if(!sub)
791
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
792
793
    /* Update the MonitoredItems */
794
0
    UA_Client_MonitoredItem dummy;
795
0
    for(size_t i = 0; sub && i < monSize; i++) {
796
        /* Get the MonitoredItem from the ClientHandle */
797
0
        dummy.clientHandle = monHandles[i];
798
0
        UA_Client_MonitoredItem *mon =
799
0
            ZIP_FIND(MonitorItemsTree, &sub->monitoredItems, &dummy);
800
0
        if(!mon)
801
0
            continue;
802
803
        /* Delete MonitoredItem if the creation failed */
804
0
        UA_MonitoredItemCreateResult *item = &response->results[i];
805
0
        if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD ||
806
0
           item->statusCode != UA_STATUSCODE_GOOD) {
807
0
            MonitoredItem_delete(client, sub, mon);
808
0
            continue;
809
0
        }
810
811
        /* Update the MonitoredItem with the server's response  */
812
0
        MonitoredItem_createFinish(client, sub, mon, item);
813
0
    }
814
815
    /* Notify the application */
816
0
    if(cc->userCallback) {
817
0
        UA_ClientAsyncCreateMonitoredItemsCallback cb =
818
0
            (UA_ClientAsyncCreateMonitoredItemsCallback)cc->userCallback;
819
0
        cb(client, cc->userData, requestId, response);
820
0
    }
821
822
    /* Clean up */
823
0
    UA_free(handles);
824
0
    UA_free(cc);
825
826
0
    unlockClient(client);
827
0
}
828
829
static UA_StatusCode
830
Client_MonitoredItems_createAsync(UA_Client *client,
831
                                  const UA_CreateMonitoredItemsRequest *constRequest,
832
                                  void **contexts, void **handlingCallbacks,
833
                                  UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
834
                                  UA_ClientAsyncServiceCallback createCallback,
835
0
                                  void *userdata, UA_UInt32 *requestId) {
836
0
    UA_LOCK_ASSERT(&client->clientMutex);
837
838
    /* Get the Subscription */
839
0
    UA_Client_Subscription *sub =
840
0
        findSubscriptionById(client, constRequest->subscriptionId);
841
0
    if(!sub)
842
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
843
844
    /* Any items? */
845
0
    if(constRequest->itemsToCreateSize == 0)
846
0
        return UA_STATUSCODE_BADNOTHINGTODO;
847
848
    /* Make a mutable copy. We modify the request to set the internal
849
     * clientHandle. */
850
0
    UA_CreateMonitoredItemsRequest request;
851
0
    UA_StatusCode res =
852
0
        UA_CreateMonitoredItemsRequest_copy(constRequest, &request);
853
0
    if(res != UA_STATUSCODE_GOOD)
854
0
        return res;
855
856
    /* Allocate context for the async handling */
857
0
    CustomCallback *cc = (CustomCallback*)UA_calloc(1, sizeof(CustomCallback));
858
0
    if(!cc) {
859
0
        UA_CreateMonitoredItemsRequest_clear(&request);
860
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
861
0
    }
862
0
    UA_UInt32 *handles = (UA_UInt32*)
863
0
        UA_malloc(sizeof(UA_UInt32) * (request.itemsToCreateSize + 2));
864
0
    if(!handles) {
865
0
        UA_free(cc);
866
0
        UA_CreateMonitoredItemsRequest_clear(&request);
867
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
868
0
    }
869
870
    /* Create the MonitoredItems locally */
871
0
    UA_STACKARRAY(UA_Client_MonitoredItem*, mons, request.itemsToCreateSize);
872
0
    memset(mons, 0, sizeof(UA_Client_MonitoredItem*) * request.itemsToCreateSize);
873
0
    for(size_t i = 0; i < request.itemsToCreateSize; i++) {
874
0
        void *context = (contexts) ? contexts[i] : NULL;
875
0
        void *handlingCallback = (handlingCallbacks) ? handlingCallbacks[i] : NULL;
876
0
        UA_Client_DeleteMonitoredItemCallback deleteCallback =
877
0
            (deleteCallbacks) ? deleteCallbacks[i] : NULL;
878
0
        res |= MonitoredItem_createBegin(client, sub, &request.itemsToCreate[i],
879
0
                                         deleteCallback, context,
880
0
                                         handlingCallback, &mons[i]);
881
0
    }
882
883
    /* Failure -> Delete created MonitoredItems. Directly call deleteCallback if
884
     * creation failed. The MonitoredItemId is not yet known, use zero. */
885
0
    if(res != UA_STATUSCODE_GOOD) {
886
0
        for(size_t i = 0; i < request.itemsToCreateSize; i++) {
887
0
            if(mons[i])
888
0
                MonitoredItem_delete(client, sub, mons[i]);
889
0
            else if(deleteCallbacks && contexts)
890
0
                deleteCallbacks[i](client, request.subscriptionId,
891
0
                                   sub->context, 0, contexts[i]);
892
0
        }
893
0
        UA_free(handles);
894
0
        UA_free(cc);
895
0
        UA_CreateMonitoredItemsRequest_clear(&request);
896
0
        return res;
897
0
    }
898
899
    /* Set the async handler context */
900
0
    handles[0] = sub->subscriptionId;
901
0
    handles[1] = (UA_UInt32)request.itemsToCreateSize;
902
0
    for(size_t i = 0; i < request.itemsToCreateSize; i++) {
903
0
        handles[i+2] = mons[i]->clientHandle;
904
0
    }
905
0
    cc->clientData = handles;
906
0
    cc->userCallback = createCallback;
907
0
    cc->userData = userdata;
908
909
    /* Call the service asynchronously */
910
0
    res = __Client_AsyncService(client, &request,
911
0
                                &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSREQUEST],
912
0
                                MonitoredItems_create_async_handler,
913
0
                                &UA_TYPES[UA_TYPES_CREATEMONITOREDITEMSRESPONSE],
914
0
                                cc, requestId);
915
916
    /* Manually clean up the context in the failure case */
917
0
    if(res != UA_STATUSCODE_GOOD) {
918
0
        for(size_t i = 0; i < request.itemsToCreateSize; i++) {
919
0
            UA_assert(mons[i]);
920
0
            MonitoredItem_delete(client, sub, mons[i]);
921
0
        }
922
0
        UA_free(handles);
923
0
        UA_free(cc);
924
0
    }
925
926
    /* Clean up */
927
0
    UA_CreateMonitoredItemsRequest_clear(&request);
928
0
    return res;
929
0
}
930
931
UA_StatusCode
932
UA_Client_MonitoredItems_createDataChanges_async(UA_Client *client,
933
                                                 const UA_CreateMonitoredItemsRequest request,
934
                                                 void **contexts,
935
                                                 UA_Client_DataChangeNotificationCallback *callbacks,
936
                                                 UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
937
                                                 UA_ClientAsyncCreateMonitoredItemsCallback createCallback,
938
0
                                                 void *userdata, UA_UInt32 *requestId) {
939
0
    lockClient(client);
940
0
    UA_StatusCode res =
941
0
        Client_MonitoredItems_createAsync(client, &request, contexts,
942
0
                                          (void **)callbacks, deleteCallbacks,
943
0
                                          (UA_ClientAsyncServiceCallback)createCallback,
944
0
                                          userdata, requestId);
945
0
    unlockClient(client);
946
0
    return res;
947
0
}
948
949
UA_StatusCode
950
UA_Client_MonitoredItems_createEvents_async(UA_Client *client,
951
                                            const UA_CreateMonitoredItemsRequest request,
952
                                            void **contexts,
953
                                            UA_Client_EventNotificationCallback *callbacks,
954
                                            UA_Client_DeleteMonitoredItemCallback *deleteCallbacks,
955
                                            UA_ClientAsyncCreateMonitoredItemsCallback createCallback,
956
0
                                            void *userdata, UA_UInt32 *requestId) {
957
0
    lockClient(client);
958
0
    UA_StatusCode res =
959
0
        Client_MonitoredItems_createAsync(client, &request, contexts,
960
0
                                          (void **)callbacks, deleteCallbacks,
961
0
                                          (UA_ClientAsyncServiceCallback)createCallback,
962
0
                                          userdata, requestId);
963
0
    unlockClient(client);
964
0
    return res;
965
0
}
966
967
static void
968
MonitoredItems_delete(UA_Client *client, UA_Client_Subscription *sub,
969
                      const UA_DeleteMonitoredItemsRequest *request,
970
0
                      const UA_DeleteMonitoredItemsResponse *response) {
971
#ifdef __clang_analyzer__
972
    return;
973
#endif
974
975
    /* Loop over deleted MonitoredItems */
976
0
    struct UA_Client_MonitoredItem_ForDelete deleteMonitoredItem;
977
0
    memset(&deleteMonitoredItem, 0, sizeof(struct UA_Client_MonitoredItem_ForDelete));
978
0
    deleteMonitoredItem.client = client;
979
0
    deleteMonitoredItem.sub = sub;
980
981
0
    for(size_t i = 0; i < response->resultsSize; i++) {
982
0
        if(response->results[i] != UA_STATUSCODE_GOOD &&
983
0
           response->results[i] != UA_STATUSCODE_BADMONITOREDITEMIDINVALID) {
984
0
            continue;
985
0
        }
986
0
        deleteMonitoredItem.monitoredItemId = &request->monitoredItemIds[i];
987
        /* Delete the internal representation */
988
0
        ZIP_ITER(MonitorItemsTree,&sub->monitoredItems,
989
0
                 MonitoredItem_delete_wrapper, &deleteMonitoredItem);
990
0
    }
991
0
}
992
993
static void
994
0
MonitoredItems_delete_handler(UA_Client *client, void *d, UA_UInt32 requestId, void *r) {
995
0
    UA_Client_Subscription *sub = NULL;
996
0
    CustomCallback *cc = (CustomCallback *)d;
997
0
    UA_DeleteMonitoredItemsResponse *response = (UA_DeleteMonitoredItemsResponse *)r;
998
0
    UA_DeleteMonitoredItemsRequest *request =
999
0
        (UA_DeleteMonitoredItemsRequest *)cc->clientData;
1000
1001
0
    lockClient(client);
1002
1003
0
    if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
1004
0
        goto cleanup;
1005
1006
0
    sub = findSubscriptionById(client, request->subscriptionId);
1007
0
    if(!sub) {
1008
0
        UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
1009
0
                    "No internal representation of subscription %" PRIu32,
1010
0
                    request->subscriptionId);
1011
0
        goto cleanup;
1012
0
    }
1013
1014
    /* Delete MonitoredItems from the internal representation */
1015
0
    MonitoredItems_delete(client, sub, request, response);
1016
1017
0
cleanup:
1018
0
    if(cc->userCallback)
1019
0
        cc->userCallback(client, cc->userData, requestId, response);
1020
0
    UA_DeleteMonitoredItemsRequest_delete(request);
1021
0
    UA_free(cc);
1022
1023
0
    unlockClient(client);
1024
0
}
1025
1026
UA_DeleteMonitoredItemsResponse
1027
UA_Client_MonitoredItems_delete(UA_Client *client,
1028
0
                                const UA_DeleteMonitoredItemsRequest request) {
1029
    /* Send the request */
1030
0
    UA_DeleteMonitoredItemsResponse response;
1031
0
    __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
1032
0
                        &response, &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE]);
1033
1034
    /* A problem occured remote? */
1035
0
    if(response.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
1036
0
        return response;
1037
1038
0
    lockClient(client);
1039
1040
    /* Find the internal subscription representation */
1041
0
    UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
1042
0
    if(!sub) {
1043
0
        UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
1044
0
                    "No internal representation of subscription %" PRIu32,
1045
0
                    request.subscriptionId);
1046
0
        unlockClient(client);
1047
0
        return response;
1048
0
    }
1049
1050
    /* Remove MonitoredItems in the internal representation */
1051
0
    MonitoredItems_delete(client, sub, &request, &response);
1052
1053
0
    unlockClient(client);
1054
1055
0
    return response;
1056
0
}
1057
1058
UA_StatusCode
1059
UA_Client_MonitoredItems_delete_async(UA_Client *client,
1060
                                      const UA_DeleteMonitoredItemsRequest request,
1061
                                      UA_ClientAsyncDeleteMonitoredItemsCallback callback,
1062
0
                                      void *userdata, UA_UInt32 *requestId) {
1063
    /* Send the request */
1064
0
    CustomCallback *cc = (CustomCallback *)UA_calloc(1, sizeof(CustomCallback));
1065
0
    if(!cc)
1066
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
1067
1068
0
    UA_DeleteMonitoredItemsRequest *req_copy = UA_DeleteMonitoredItemsRequest_new();
1069
0
    if(!req_copy) {
1070
0
        UA_free(cc);
1071
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
1072
0
    }
1073
1074
0
    UA_DeleteMonitoredItemsRequest_copy(&request, req_copy);
1075
0
    cc->clientData = req_copy;
1076
0
    cc->userCallback = (UA_ClientAsyncServiceCallback)callback;
1077
0
    cc->userData = userdata;
1078
1079
0
    UA_StatusCode res =
1080
0
        __UA_Client_AsyncService(client, &request,
1081
0
                                 &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSREQUEST],
1082
0
                                 MonitoredItems_delete_handler,
1083
0
                                 &UA_TYPES[UA_TYPES_DELETEMONITOREDITEMSRESPONSE],
1084
0
                                 cc, requestId);
1085
0
    if(res != UA_STATUSCODE_GOOD) {
1086
0
        UA_DeleteMonitoredItemsRequest_delete(req_copy);
1087
0
        UA_free(cc);
1088
0
    }
1089
0
    return res;
1090
0
}
1091
1092
UA_StatusCode
1093
UA_Client_MonitoredItems_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId,
1094
0
                                      UA_UInt32 monitoredItemId) {
1095
0
    UA_DeleteMonitoredItemsRequest request;
1096
0
    UA_DeleteMonitoredItemsRequest_init(&request);
1097
0
    request.subscriptionId = subscriptionId;
1098
0
    request.monitoredItemIds = &monitoredItemId;
1099
0
    request.monitoredItemIdsSize = 1;
1100
1101
0
    UA_DeleteMonitoredItemsResponse response =
1102
0
        UA_Client_MonitoredItems_delete(client, request);
1103
1104
0
    UA_StatusCode retval = response.responseHeader.serviceResult;
1105
0
    if(retval != UA_STATUSCODE_GOOD) {
1106
0
        UA_DeleteMonitoredItemsResponse_clear(&response);
1107
0
        return retval;
1108
0
    }
1109
1110
0
    if(response.resultsSize != 1) {
1111
0
        UA_DeleteMonitoredItemsResponse_clear(&response);
1112
0
        return UA_STATUSCODE_BADINTERNALERROR;
1113
0
    }
1114
1115
0
    retval = response.results[0];
1116
0
    UA_DeleteMonitoredItemsResponse_clear(&response);
1117
0
    return retval;
1118
0
}
1119
1120
static void *
1121
0
MonitoredItem_findByID(void *data, UA_Client_MonitoredItem *mon) {
1122
0
    UA_UInt32 monitorId = *(UA_UInt32*)data;
1123
0
    if(monitorId && (mon->monitoredItemId == monitorId))
1124
0
        return mon;
1125
0
    return NULL;
1126
0
}
1127
1128
static UA_Client_MonitoredItem *
1129
0
findMonitoredItemById(UA_Client_Subscription *sub, UA_UInt32 monitoredItemId) {
1130
0
    return (UA_Client_MonitoredItem *)
1131
0
        ZIP_ITER(MonitorItemsTree, &sub->monitoredItems,
1132
0
                 MonitoredItem_findByID, &monitoredItemId);
1133
0
}
1134
1135
static void
1136
setExistingClientHandles(UA_Client_Subscription *sub,
1137
0
                         UA_ModifyMonitoredItemsRequest *request) {
1138
0
    for(size_t i = 0; i < request->itemsToModifySize; ++i) {
1139
0
        UA_MonitoredItemModifyRequest *mimr = &request->itemsToModify[i];
1140
0
        UA_Client_MonitoredItem *mon =
1141
0
            findMonitoredItemById(sub, mimr->monitoredItemId);
1142
0
        if(mon)
1143
0
            mimr->requestedParameters.clientHandle = mon->clientHandle;
1144
0
    }
1145
0
}
1146
1147
UA_ModifyMonitoredItemsResponse
1148
UA_Client_MonitoredItems_modify(UA_Client *client,
1149
0
                                const UA_ModifyMonitoredItemsRequest request) {
1150
0
    UA_ModifyMonitoredItemsResponse response;
1151
0
    UA_ModifyMonitoredItemsResponse_init(&response);
1152
1153
    /* Make a modifiable copy of the request */
1154
0
    UA_ModifyMonitoredItemsRequest modifiedRequest;
1155
0
    UA_StatusCode res = UA_ModifyMonitoredItemsRequest_copy(&request, &modifiedRequest);
1156
0
    if(res != UA_STATUSCODE_GOOD) {
1157
0
        response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
1158
0
        return response;
1159
0
    }
1160
1161
0
    lockClient(client);
1162
1163
    /* Get the subscription */
1164
0
    UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
1165
0
    if(!sub) {
1166
0
        unlockClient(client);
1167
0
        UA_ModifyMonitoredItemsRequest_clear(&modifiedRequest);
1168
0
        response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
1169
0
        return response;
1170
0
    }
1171
1172
    /* Reuse the existing ClientHandles for the MonitoredItems */
1173
0
    setExistingClientHandles(sub, &modifiedRequest);
1174
1175
    /* Call the service */
1176
0
    __Client_Service(client, &modifiedRequest,
1177
0
                     &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST], &response,
1178
0
                     &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE]);
1179
1180
0
    unlockClient(client);
1181
1182
    /* Clean up */
1183
0
    UA_ModifyMonitoredItemsRequest_clear(&modifiedRequest);
1184
0
    return response;
1185
0
}
1186
1187
UA_StatusCode
1188
UA_Client_MonitoredItems_modify_async(UA_Client *client,
1189
                                      const UA_ModifyMonitoredItemsRequest request,
1190
                                      UA_ClientAsyncModifyMonitoredItemsCallback callback,
1191
0
                                      void *userdata, UA_UInt32 *requestId) {
1192
    /* Make a modifiable copy of the request */
1193
0
    UA_ModifyMonitoredItemsRequest modifiedRequest;
1194
0
    UA_StatusCode res = UA_ModifyMonitoredItemsRequest_copy(&request, &modifiedRequest);
1195
0
    if(res != UA_STATUSCODE_GOOD)
1196
0
        return res;
1197
1198
0
    lockClient(client);
1199
1200
    /* Get the subscription */
1201
0
    UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
1202
0
    if(!sub) {
1203
0
        unlockClient(client);
1204
0
        UA_ModifyMonitoredItemsRequest_clear(&modifiedRequest);
1205
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
1206
0
    }
1207
1208
    /* Reuse the existing ClientHandles for the MonitoredItems */
1209
0
    setExistingClientHandles(sub, &modifiedRequest);
1210
1211
    /* Call the service */
1212
0
    UA_StatusCode statusCode = __Client_AsyncService(
1213
0
        client, &modifiedRequest, &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSREQUEST],
1214
0
        (UA_ClientAsyncServiceCallback)callback,
1215
0
        &UA_TYPES[UA_TYPES_MODIFYMONITOREDITEMSRESPONSE], userdata, requestId);
1216
1217
0
    unlockClient(client);
1218
1219
    /* Clean up */
1220
0
    UA_ModifyMonitoredItemsRequest_clear(&modifiedRequest);
1221
0
    return statusCode;
1222
0
}
1223
1224
UA_StatusCode
1225
UA_Client_MonitoredItem_getContext(UA_Client *client, UA_UInt32 subscriptionId,
1226
0
                                   UA_UInt32 monitoredItemId, void **monContext) {
1227
0
    if(!client || !monContext)
1228
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1229
1230
0
    *monContext = NULL;
1231
1232
0
    lockClient(client);
1233
0
    UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
1234
0
    if(!sub) {
1235
0
        unlockClient(client);
1236
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
1237
0
    }
1238
1239
0
    UA_StatusCode itemstatus = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
1240
0
    UA_Client_MonitoredItem *monItem = findMonitoredItemById(sub, monitoredItemId);
1241
0
    if(monItem) {
1242
0
        *monContext = monItem->context;
1243
0
        itemstatus = UA_STATUSCODE_GOOD;
1244
0
    }
1245
0
    unlockClient(client);
1246
0
    return itemstatus;
1247
0
}
1248
1249
UA_StatusCode
1250
UA_Client_MonitoredItem_setContext(UA_Client *client, UA_UInt32 subscriptionId,
1251
0
                                   UA_UInt32 monitoredItemId, void *monContext) {
1252
0
    if(!client)
1253
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1254
1255
0
    lockClient(client);
1256
0
    UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
1257
0
    if(!sub) {
1258
0
        unlockClient(client);
1259
0
        return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
1260
0
    }
1261
1262
0
    UA_StatusCode itemstatus = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
1263
0
    UA_Client_MonitoredItem *monItem = findMonitoredItemById(sub, monitoredItemId);
1264
0
    if(monItem) {
1265
0
        monItem->context = monContext;
1266
0
        itemstatus = UA_STATUSCODE_GOOD;
1267
0
    }
1268
0
    unlockClient(client);
1269
0
    return itemstatus;
1270
0
}
1271
1272
/*************************************/
1273
/* Async Processing of Notifications */
1274
/*************************************/
1275
1276
/* Assume the request is already initialized */
1277
UA_StatusCode
1278
0
__Client_preparePublishRequest(UA_Client *client, UA_PublishRequest *request) {
1279
0
    UA_LOCK_ASSERT(&client->clientMutex);
1280
1281
    /* Count acks */
1282
0
    UA_Client_NotificationsAckNumber *ack;
1283
0
    LIST_FOREACH(ack, &client->pendingNotificationsAcks, listEntry)
1284
0
        ++request->subscriptionAcknowledgementsSize;
1285
1286
    /* Create the array. Returns a sentinel pointer if the length is zero. */
1287
0
    request->subscriptionAcknowledgements = (UA_SubscriptionAcknowledgement*)
1288
0
        UA_Array_new(request->subscriptionAcknowledgementsSize,
1289
0
                     &UA_TYPES[UA_TYPES_SUBSCRIPTIONACKNOWLEDGEMENT]);
1290
0
    if(!request->subscriptionAcknowledgements) {
1291
0
        request->subscriptionAcknowledgementsSize = 0;
1292
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
1293
0
    }
1294
1295
0
    size_t i = 0;
1296
0
    UA_Client_NotificationsAckNumber *ack_tmp;
1297
0
    LIST_FOREACH_SAFE(ack, &client->pendingNotificationsAcks, listEntry, ack_tmp) {
1298
0
        LIST_REMOVE(ack, listEntry);
1299
0
        UA_SubscriptionAcknowledgement *reqAck = &request->subscriptionAcknowledgements[i];
1300
0
        reqAck->sequenceNumber = ack->subAck.sequenceNumber;
1301
0
        reqAck->subscriptionId = ack->subAck.subscriptionId;
1302
0
        UA_free(ack);
1303
0
        i++;
1304
0
    }
1305
0
    return UA_STATUSCODE_GOOD;
1306
0
}
1307
1308
/* According to specification, Part 4 5.13.1, the value 0 is never used for the
1309
 * sequence number */
1310
static UA_UInt32
1311
0
__nextSequenceNumber(UA_UInt32 sequenceNumber) {
1312
0
    UA_UInt32 nextSequenceNumber = sequenceNumber + 1;
1313
0
    if(nextSequenceNumber == 0)
1314
0
        nextSequenceNumber = 1;
1315
0
    return nextSequenceNumber;
1316
0
}
1317
1318
static void
1319
processDataChangeNotification(UA_Client *client, UA_Client_Subscription *sub,
1320
0
                              UA_DataChangeNotification *dataChangeNotification) {
1321
0
    UA_LOCK_ASSERT(&client->clientMutex);
1322
1323
0
    for(size_t j = 0; j < dataChangeNotification->monitoredItemsSize; ++j) {
1324
0
        UA_MonitoredItemNotification *min = &dataChangeNotification->monitoredItems[j];
1325
1326
        /* Find the MonitoredItem */
1327
0
        UA_Client_MonitoredItem *mon;
1328
0
        UA_Client_MonitoredItem dummy;
1329
0
        dummy.clientHandle = min->clientHandle;
1330
0
        mon = ZIP_FIND(MonitorItemsTree, &sub->monitoredItems, &dummy);
1331
1332
0
        if(!mon) {
1333
0
            UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1334
0
                           "Could not process a notification with clienthandle %" PRIu32
1335
0
                           " on subscription %" PRIu32, min->clientHandle, sub->subscriptionId);
1336
0
            continue;
1337
0
        }
1338
1339
0
        if(mon->isEventMonitoredItem) {
1340
0
            UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1341
0
                           "MonitoredItem is configured for Events. But received a "
1342
0
                           "DataChangeNotification.");
1343
0
            continue;
1344
0
        }
1345
1346
0
        if(mon->handler.dataChangeCallback) {
1347
0
            void *subC = sub->context;
1348
0
            void *monC = mon->context;
1349
0
            UA_UInt32 subId = sub->subscriptionId;
1350
0
            UA_UInt32 monId = mon->monitoredItemId;
1351
0
            mon->handler.dataChangeCallback(client, subId, subC, monId, monC, &min->value);
1352
0
        }
1353
0
    }
1354
0
}
1355
1356
static void
1357
processEventNotification(UA_Client *client, UA_Client_Subscription *sub,
1358
0
                         UA_EventNotificationList *eventNotificationList) {
1359
0
    UA_LOCK_ASSERT(&client->clientMutex);
1360
1361
0
    for(size_t j = 0; j < eventNotificationList->eventsSize; ++j) {
1362
0
        UA_EventFieldList *efl = &eventNotificationList->events[j];
1363
1364
        /* Find the MonitoredItem */
1365
0
        UA_Client_MonitoredItem *mon;
1366
0
        UA_Client_MonitoredItem dummy;
1367
0
        dummy.clientHandle = efl->clientHandle;
1368
0
        mon = ZIP_FIND(MonitorItemsTree, &sub->monitoredItems, &dummy);
1369
1370
0
        if(!mon) {
1371
0
            UA_LOG_DEBUG(client->config.logging, UA_LOGCATEGORY_CLIENT,
1372
0
                         "Could not process a notification with clienthandle %" PRIu32
1373
0
                         " on subscription %" PRIu32, efl->clientHandle,
1374
0
                         sub->subscriptionId);
1375
0
            continue;
1376
0
        }
1377
1378
0
        if(!mon->isEventMonitoredItem) {
1379
0
            UA_LOG_DEBUG(client->config.logging, UA_LOGCATEGORY_CLIENT,
1380
0
                         "MonitoredItem is configured for DataChanges. But received a "
1381
0
                         "EventNotification");
1382
0
            continue;
1383
0
        }
1384
1385
0
        if(mon->eventFields.mapSize != efl->eventFieldsSize) {
1386
0
            UA_LOG_DEBUG(client->config.logging, UA_LOGCATEGORY_CLIENT,
1387
0
                         "MonitoredItem received a EventNotification with the "
1388
0
                          "wrong number of event fields");
1389
0
            continue;
1390
0
        }
1391
1392
        /* Prepare the key-value map and call the callback  */
1393
0
        for(size_t i = 0; i < mon->eventFields.mapSize; i++) {
1394
0
            mon->eventFields.map[i].value = efl->eventFields[i];
1395
0
        }
1396
0
        mon->handler.eventCallback(client, sub->subscriptionId, sub->context,
1397
0
                                   mon->monitoredItemId, mon->context, mon->eventFields);
1398
0
    }
1399
0
}
1400
1401
static void
1402
processNotificationMessage(UA_Client *client, UA_Client_Subscription *sub,
1403
0
                           UA_ExtensionObject *msg) {
1404
0
    UA_LOCK_ASSERT(&client->clientMutex);
1405
1406
0
    if(msg->encoding != UA_EXTENSIONOBJECT_DECODED)
1407
0
        return;
1408
1409
    /* Handle DataChangeNotification */
1410
0
    if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_DATACHANGENOTIFICATION]) {
1411
0
        UA_DataChangeNotification *dataChangeNotification =
1412
0
            (UA_DataChangeNotification *)msg->content.decoded.data;
1413
0
        processDataChangeNotification(client, sub, dataChangeNotification);
1414
0
        return;
1415
0
    }
1416
1417
    /* Handle EventNotification */
1418
0
    if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_EVENTNOTIFICATIONLIST]) {
1419
0
        UA_EventNotificationList *eventNotificationList =
1420
0
            (UA_EventNotificationList *)msg->content.decoded.data;
1421
0
        processEventNotification(client, sub, eventNotificationList);
1422
0
        return;
1423
0
    }
1424
1425
    /* Handle StatusChangeNotification */
1426
0
    if(msg->content.decoded.type == &UA_TYPES[UA_TYPES_STATUSCHANGENOTIFICATION]) {
1427
0
        if(sub->statusChangeCallback) {
1428
0
            void *subC = sub->context;
1429
0
            UA_UInt32 subId = sub->subscriptionId;
1430
0
            sub->statusChangeCallback(client, subId, subC,
1431
0
                                      (UA_StatusChangeNotification*)msg->content.decoded.data);
1432
0
        } else {
1433
0
            UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1434
0
                           "Dropped a StatusChangeNotification since no "
1435
0
                           "callback is registered");
1436
0
        }
1437
0
        return;
1438
0
    }
1439
1440
0
    UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1441
0
                   "Unknown notification message type");
1442
0
}
1443
1444
static void
1445
__Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishRequest *request,
1446
0
                                              UA_PublishResponse *response) {
1447
0
    UA_LOCK_ASSERT(&client->clientMutex);
1448
1449
    /* Reduce the number of "in-flight" PublishRequests */
1450
0
    client->currentlyOutStandingPublishRequests--;
1451
1452
    /* Process ServiceResult for bad StatusCodes without referring to a
1453
     * SubscriptionId */
1454
0
    switch(response->responseHeader.serviceResult) {
1455
0
    case UA_STATUSCODE_BADTOOMANYPUBLISHREQUESTS:
1456
        /* Correct the assumed number of max outstanding requests */
1457
0
        if(client->config.outStandingPublishRequests > 1) {
1458
0
            client->config.outStandingPublishRequests--;
1459
0
            UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1460
0
                           "PublishResponse: Too many PublishRequest, reduce "
1461
0
                           "outStandingPublishRequests to %" PRId16,
1462
0
                           client->config.outStandingPublishRequests);
1463
0
        } else {
1464
0
            UA_LOG_ERROR(client->config.logging, UA_LOGCATEGORY_CLIENT,
1465
0
                         "PublishResponse: Too many PublishRequests when "
1466
0
                         "outStandingPublishRequests = 1");
1467
0
            UA_Client_Subscriptions_deleteSingle(client, response->subscriptionId);
1468
0
        }
1469
0
        return;
1470
1471
0
    case UA_STATUSCODE_BADSHUTDOWN:
1472
        /* If the remote server shuts down, DEBUG-log to avoid a warning-storm
1473
         * for normal operations */
1474
0
        UA_LOG_DEBUG(client->config.logging, UA_LOGCATEGORY_CLIENT,
1475
0
                     "PublishResponse: Received BadShutdown status");
1476
0
        return;
1477
1478
0
    case UA_STATUSCODE_BADNOSUBSCRIPTION:
1479
        /* There is no Subscription configured, the server expects no
1480
         * PublishRequests. We demote this to debug-logging, as it can occur
1481
         * during regular shutdown when the Subscriptions are removed. */
1482
0
        UA_LOG_DEBUG(client->config.logging, UA_LOGCATEGORY_CLIENT,
1483
0
                     "PublishResponse: Received BadNoSubscription status");
1484
0
        return;
1485
1486
0
    default:
1487
0
        break;
1488
0
    }
1489
1490
    /* Get the Subscription */
1491
0
    UA_Client_Subscription *sub = findSubscriptionById(client, response->subscriptionId);
1492
0
    if(!sub) {
1493
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
1494
0
        UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1495
0
                       "PublishResponse: Received response for an unknown Subscription");
1496
0
        return;
1497
0
    }
1498
1499
    /* Process ServiceResult */
1500
0
    switch(response->responseHeader.serviceResult) {
1501
0
    case UA_STATUSCODE_BADSESSIONCLOSED:
1502
        /* The Session no longer exists on the server - remove the Subscription */
1503
0
        __Client_Subscription_deleteInternal(client, sub);
1504
0
        return;
1505
1506
0
    case UA_STATUSCODE_BADTIMEOUT:
1507
0
        UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1508
0
                       "PublishResponse: Aborted with BadTimeout status");
1509
0
        if(client->config.subscriptionInactivityCallback) {
1510
0
            void *subC = sub->context;
1511
0
            UA_UInt32 subId = sub->subscriptionId;
1512
0
            client->config.subscriptionInactivityCallback(client, subId, subC);
1513
0
        }
1514
0
        return;
1515
1516
0
    case UA_STATUSCODE_GOOD:
1517
0
        break; /* Continue below */
1518
1519
0
    default:
1520
        /* Catch-all for other bad StatusCodes */
1521
0
        UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1522
0
                       "PublishResponse: Received %s status",
1523
0
                       UA_StatusCode_name(response->responseHeader.serviceResult));
1524
0
        return;
1525
0
    }
1526
1527
    /* Update the LastActivity for the Subscription */
1528
0
    UA_EventLoop *el = client->config.eventLoop;
1529
0
    sub->lastActivity = el->dateTime_nowMonotonic(el);
1530
1531
    /* Detect missing message - OPC Unified Architecture, Part 4 5.13.1.1 e) */
1532
0
    UA_NotificationMessage *msg = &response->notificationMessage;
1533
0
    if(__nextSequenceNumber(sub->sequenceNumber) != msg->sequenceNumber) {
1534
0
        UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1535
0
                       "PublishResponse: Invalid subscription sequence number: "
1536
0
                       "Expected %" PRIu32 " but got %" PRIu32,
1537
0
                       __nextSequenceNumber(sub->sequenceNumber), msg->sequenceNumber);
1538
        /* This is an error. But we do not abort the connection. Some server
1539
         * SDKs misbehave from time to time and send out-of-order sequence
1540
         * numbers. (Probably some multi-threading synchronization issue.) */
1541
        /* UA_Client_disconnect(client);
1542
           return; */
1543
0
    }
1544
1545
    /* According to f), a keep-alive message contains no notifications and has
1546
     * the sequence number of the next NotificationMessage that is to be sent =>
1547
     * More than one consecutive keep-alive message or a NotificationMessage
1548
     * following a keep-alive message will share the same sequence number. */
1549
0
    if(msg->notificationDataSize)
1550
0
        sub->sequenceNumber = msg->sequenceNumber;
1551
1552
    /* Process the notification messages */
1553
0
    for(size_t k = 0; k < msg->notificationDataSize; ++k)
1554
0
        processNotificationMessage(client, sub, &msg->notificationData[k]);
1555
1556
    /* Add the current NotificationMessage (SequenceNumber) to the list of
1557
     * pending acks to be acknowledged. But only if it is in the list of
1558
     * sequence numbers the server has available. */
1559
0
    for(size_t i = 0; i < response->availableSequenceNumbersSize; i++) {
1560
0
        if(response->availableSequenceNumbers[i] != msg->sequenceNumber)
1561
0
            continue;
1562
0
        UA_Client_NotificationsAckNumber *tmpAck = (UA_Client_NotificationsAckNumber*)
1563
0
            UA_malloc(sizeof(UA_Client_NotificationsAckNumber));
1564
0
        if(!tmpAck) {
1565
0
            UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1566
0
                           "PublishResponse: Not enough memory to store the pending "
1567
0
                           "acknowledgement for Subscription %" PRIu32, sub->subscriptionId);
1568
0
            break;
1569
0
        }
1570
0
        tmpAck->subAck.sequenceNumber = msg->sequenceNumber;
1571
0
        tmpAck->subAck.subscriptionId = sub->subscriptionId;
1572
0
        LIST_INSERT_HEAD(&client->pendingNotificationsAcks, tmpAck, listEntry);
1573
0
        break;
1574
0
    }
1575
0
}
1576
1577
static void
1578
processPublishResponseAsync(UA_Client *client, void *userdata,
1579
0
                            UA_UInt32 requestId, void *response) {
1580
0
    UA_PublishRequest *req = (UA_PublishRequest*)userdata;
1581
0
    UA_PublishResponse *res = (UA_PublishResponse*)response;
1582
1583
0
    lockClient(client);
1584
1585
    /* Process the response */
1586
0
    __Client_Subscriptions_processPublishResponse(client, req, res);
1587
1588
    /* Delete the cached request */
1589
0
    UA_PublishRequest_delete(req);
1590
1591
    /* Fill up the outstanding publish requests */
1592
0
    __Client_Subscriptions_backgroundPublish(client);
1593
1594
0
    unlockClient(client);
1595
0
}
1596
1597
void
1598
120
__Client_Subscriptions_clear(UA_Client *client) {
1599
120
    UA_Client_NotificationsAckNumber *n;
1600
120
    UA_Client_NotificationsAckNumber *tmp;
1601
120
    LIST_FOREACH_SAFE(n, &client->pendingNotificationsAcks, listEntry, tmp) {
1602
0
        LIST_REMOVE(n, listEntry);
1603
0
        UA_free(n);
1604
0
    }
1605
1606
120
    UA_Client_Subscription *sub;
1607
120
    UA_Client_Subscription *tmps;
1608
120
    LIST_FOREACH_SAFE(sub, &client->subscriptions, listEntry, tmps)
1609
0
        __Client_Subscription_deleteInternal(client, sub); /* force local removal */
1610
1611
120
    client->monitoredItemHandles = 0;
1612
120
}
1613
1614
void
1615
0
__Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client) {
1616
0
    UA_LOCK_ASSERT(&client->clientMutex);
1617
1618
0
    UA_EventLoop *el = client->config.eventLoop;
1619
0
    UA_DateTime nowm = el->dateTime_nowMonotonic(el);
1620
1621
0
    UA_Client_Subscription *sub;
1622
0
    LIST_FOREACH(sub, &client->subscriptions, listEntry) {
1623
0
        UA_DateTime maxSilence = (UA_DateTime)
1624
0
            ((sub->publishingInterval * sub->maxKeepAliveCount) +
1625
0
             client->config.timeout) * UA_DATETIME_MSEC;
1626
0
        if(maxSilence + sub->lastActivity < nowm) {
1627
            /* Reset activity */
1628
0
            sub->lastActivity = nowm;
1629
1630
0
            if(client->config.subscriptionInactivityCallback) {
1631
0
                void *subC = sub->context;
1632
0
                UA_UInt32 subId = sub->subscriptionId;
1633
0
                client->config.subscriptionInactivityCallback(client, subId, subC);
1634
0
            }
1635
0
            UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
1636
0
                           "Inactivity for Subscription %" PRIu32 ".",
1637
0
                           sub->subscriptionId);
1638
0
        }
1639
0
    }
1640
0
}
1641
1642
void
1643
0
__Client_Subscriptions_backgroundPublish(UA_Client *client) {
1644
0
    UA_LOCK_ASSERT(&client->clientMutex);
1645
1646
0
    if(client->sessionState != UA_SESSIONSTATE_ACTIVATED)
1647
0
        return;
1648
1649
    /* The session must have at least one subscription */
1650
0
    if(!LIST_FIRST(&client->subscriptions))
1651
0
        return;
1652
1653
0
    while(client->currentlyOutStandingPublishRequests < client->config.outStandingPublishRequests) {
1654
0
        UA_PublishRequest *request = UA_PublishRequest_new();
1655
0
        if(!request)
1656
0
            return;
1657
1658
        /* Publish requests are valid for 10 minutes */
1659
0
        request->requestHeader.timeoutHint = 10 * 60 * 1000;
1660
1661
0
        UA_StatusCode retval = __Client_preparePublishRequest(client, request);
1662
0
        if(retval != UA_STATUSCODE_GOOD) {
1663
0
            UA_PublishRequest_delete(request);
1664
0
            return;
1665
0
        }
1666
1667
0
        retval = __Client_AsyncService(client, request,
1668
0
                                         &UA_TYPES[UA_TYPES_PUBLISHREQUEST],
1669
0
                                         processPublishResponseAsync,
1670
0
                                         &UA_TYPES[UA_TYPES_PUBLISHRESPONSE],
1671
0
                                         (void*)request, NULL);
1672
0
        if(retval != UA_STATUSCODE_GOOD) {
1673
0
            UA_PublishRequest_delete(request);
1674
0
            return;
1675
0
        }
1676
1677
0
        client->currentlyOutStandingPublishRequests++;
1678
0
    }
1679
0
}
1680
1681
UA_SetPublishingModeResponse
1682
UA_Client_Subscriptions_setPublishingMode(UA_Client *client,
1683
0
                                          const UA_SetPublishingModeRequest request) {
1684
0
    UA_SetPublishingModeResponse response;
1685
0
    __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_SETPUBLISHINGMODEREQUEST],
1686
0
                        &response, &UA_TYPES[UA_TYPES_SETPUBLISHINGMODERESPONSE]);
1687
0
    return response;
1688
0
}
1689
1690
UA_SetMonitoringModeResponse
1691
UA_Client_MonitoredItems_setMonitoringMode(UA_Client *client,
1692
0
                                           const UA_SetMonitoringModeRequest request) {
1693
0
    UA_SetMonitoringModeResponse response;
1694
0
    __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_SETMONITORINGMODEREQUEST],
1695
0
                        &response, &UA_TYPES[UA_TYPES_SETMONITORINGMODERESPONSE]);
1696
0
    return response;
1697
0
}
1698
1699
UA_StatusCode
1700
UA_Client_MonitoredItems_setMonitoringMode_async(UA_Client *client,
1701
                                                 const UA_SetMonitoringModeRequest request,
1702
                                                 UA_ClientAsyncSetMonitoringModeCallback callback,
1703
0
                                                 void *userdata, UA_UInt32 *requestId) {
1704
0
    return __UA_Client_AsyncService(client, &request,
1705
0
                                    &UA_TYPES[UA_TYPES_SETMONITORINGMODEREQUEST],
1706
0
                                    (UA_ClientAsyncServiceCallback)callback,
1707
0
                                    &UA_TYPES[UA_TYPES_SETMONITORINGMODERESPONSE],
1708
0
                                    userdata, requestId);
1709
0
}
1710
1711
UA_SetTriggeringResponse
1712
UA_Client_MonitoredItems_setTriggering(UA_Client *client,
1713
0
                                       const UA_SetTriggeringRequest request) {
1714
0
    UA_SetTriggeringResponse response;
1715
0
    __UA_Client_Service(client, &request, &UA_TYPES[UA_TYPES_SETTRIGGERINGREQUEST],
1716
0
                        &response, &UA_TYPES[UA_TYPES_SETTRIGGERINGRESPONSE]);
1717
0
    return response;
1718
0
}
1719
1720
UA_StatusCode
1721
UA_Client_MonitoredItems_setTriggering_async(UA_Client *client,
1722
                                             const UA_SetTriggeringRequest request,
1723
                                             UA_ClientAsyncSetTriggeringCallback callback,
1724
0
                                             void *userdata, UA_UInt32 *requestId) {
1725
0
    return __UA_Client_AsyncService(client, &request,
1726
0
                                    &UA_TYPES[UA_TYPES_SETTRIGGERINGREQUEST],
1727
0
                                    (UA_ClientAsyncServiceCallback)callback,
1728
0
                                    &UA_TYPES[UA_TYPES_SETTRIGGERINGRESPONSE],
1729
0
                                    userdata, requestId);
1730
0
}