Coverage Report

Created: 2025-07-18 06:17

/src/open62541/src/server/ua_services_subscription.c
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2014-2018, 2022 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2016-2017 (c) Florian Palm
7
 *    Copyright 2015 (c) Chris Iatrou
8
 *    Copyright 2015-2016 (c) Sten GrĂ¼ner
9
 *    Copyright 2015-2016 (c) Oleksiy Vasylyev
10
 *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
11
 *    Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
12
 *    Copyright 2017 (c) Mattias Bornhager
13
 *    Copyright 2017 (c) Henrik Norrman
14
 *    Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
15
 *    Copyright 2018 (c) Fabian Arndt, Root-Core
16
 *    Copyright 2017-2019 (c) HMS Industrial Networks AB (Author: Jonas Green)
17
 */
18
19
#include "ua_server_internal.h"
20
#include "ua_services.h"
21
#include "ua_subscription.h"
22
23
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
24
25
static void
26
setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
27
                        UA_Double requestedPublishingInterval,
28
                        UA_UInt32 requestedLifetimeCount,
29
                        UA_UInt32 requestedMaxKeepAliveCount,
30
                        UA_UInt32 maxNotificationsPerPublish,
31
0
                        UA_Byte priority) {
32
0
    UA_LOCK_ASSERT(&server->serviceMutex);
33
34
    /* re-parameterize the subscription */
35
0
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
36
0
                               requestedPublishingInterval,
37
0
                               subscription->publishingInterval);
38
    /* check for nan*/
39
0
    if(requestedPublishingInterval != requestedPublishingInterval)
40
0
        subscription->publishingInterval = server->config.publishingIntervalLimits.min;
41
0
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
42
0
                               requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
43
0
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
44
0
                               requestedLifetimeCount, subscription->lifeTimeCount);
45
0
    if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
46
0
        subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
47
0
    subscription->notificationsPerPublish = maxNotificationsPerPublish;
48
0
    if(maxNotificationsPerPublish == 0 ||
49
0
       maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
50
0
        subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
51
0
    subscription->priority = priority;
52
0
}
53
54
UA_Boolean
55
Service_CreateSubscription(UA_Server *server, UA_Session *session,
56
                           const UA_CreateSubscriptionRequest *request,
57
0
                           UA_CreateSubscriptionResponse *response) {
58
0
    UA_LOCK_ASSERT(&server->serviceMutex);
59
60
    /* Check limits for the number of subscriptions */
61
0
    if(((server->config.maxSubscriptions != 0) &&
62
0
        (server->subscriptionsSize >= server->config.maxSubscriptions)) ||
63
0
       ((server->config.maxSubscriptionsPerSession != 0) &&
64
0
        (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession))) {
65
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
66
0
        return true;
67
0
    }
68
69
    /* Create the subscription */
70
0
    UA_Subscription *sub = UA_Subscription_new();
71
0
    if(!sub) {
72
0
        UA_LOG_DEBUG_SESSION(server->config.logging, session,
73
0
                             "Processing CreateSubscriptionRequest failed");
74
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
75
0
        return true;
76
0
    }
77
78
    /* Set the subscription parameters */
79
0
    setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
80
0
                            request->requestedLifetimeCount,
81
0
                            request->requestedMaxKeepAliveCount,
82
0
                            request->maxNotificationsPerPublish, request->priority);
83
0
    sub->subscriptionId = ++server->lastSubscriptionId;  /* Assign the SubscriptionId */
84
85
    /* Register the subscription in the server */
86
0
    LIST_INSERT_HEAD(&server->subscriptions, sub, serverListEntry);
87
0
    server->subscriptionsSize++;
88
89
    /* Update the server statistics */
90
0
    server->serverDiagnosticsSummary.currentSubscriptionCount++;
91
0
    server->serverDiagnosticsSummary.cumulatedSubscriptionCount++;
92
93
    /* Attach the Subscription to the session */
94
0
    UA_Session_attachSubscription(session, sub);
95
96
    /* Create representation in the Session object */
97
0
#ifdef UA_ENABLE_DIAGNOSTICS
98
0
    createSubscriptionObject(server, session, sub);
99
0
#endif
100
101
    /* Set the subscription state. This also registers the callback.
102
     * Note that also a disabled subscription publishes keepalives. */
103
0
    UA_SubscriptionState sState = (request->publishingEnabled) ?
104
0
        UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH;
105
0
    UA_StatusCode res = Subscription_setState(server, sub, sState);
106
0
    if(res != UA_STATUSCODE_GOOD) {
107
0
        UA_LOG_DEBUG_SESSION(server->config.logging, sub->session,
108
0
                             "Subscription %" PRIu32 " | Could not register "
109
0
                             "publish callback with error code %s",
110
0
                             sub->subscriptionId, UA_StatusCode_name(res));
111
0
        response->responseHeader.serviceResult = res;
112
0
        UA_Subscription_delete(server, sub);
113
0
        return true;
114
0
    }
115
116
0
    UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub,
117
0
                             "Subscription created (Publishing interval %.2fms, "
118
0
                             "max %lu notifications per publish)",
119
0
                             sub->publishingInterval,
120
0
                             (long unsigned)sub->notificationsPerPublish);
121
122
    /* Prepare the response */
123
0
    response->subscriptionId = sub->subscriptionId;
124
0
    response->revisedPublishingInterval = sub->publishingInterval;
125
0
    response->revisedLifetimeCount = sub->lifeTimeCount;
126
0
    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
127
128
0
    return true;
129
0
}
130
131
UA_Boolean
132
Service_ModifySubscription(UA_Server *server, UA_Session *session,
133
                           const UA_ModifySubscriptionRequest *request,
134
0
                           UA_ModifySubscriptionResponse *response) {
135
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
136
0
                         "Processing ModifySubscriptionRequest");
137
0
    UA_LOCK_ASSERT(&server->serviceMutex);
138
139
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);
140
0
    if(!sub) {
141
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
142
0
        return true;
143
0
    }
144
145
    /* Store the old publishing interval */
146
0
    UA_Double oldPublishingInterval = sub->publishingInterval;
147
0
    UA_Byte oldPriority = sub->priority;
148
149
    /* Change the Subscription settings */
150
0
    setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
151
0
                            request->requestedLifetimeCount,
152
0
                            request->requestedMaxKeepAliveCount,
153
0
                            request->maxNotificationsPerPublish, request->priority);
154
155
    /* Reset the subscription lifetime */
156
0
    Subscription_resetLifetime(sub);
157
158
    /* The publish interval has changed */
159
0
    if(sub->publishingInterval != oldPublishingInterval) {
160
        /* Change the repeated callback to the new interval. This cannot fail as
161
         * memory is reused. */
162
0
        if(sub->publishCallbackId > 0)
163
0
            changeRepeatedCallbackInterval(server, sub->publishCallbackId,
164
0
                                           sub->publishingInterval);
165
166
        /* For each MonitoredItem check if it was/shall be attached to the
167
         * publish interval. This ensures that we have less cyclic callbacks
168
         * registered and that the notifications are fresh. */
169
0
        UA_MonitoredItem *mon;
170
0
        LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
171
0
            if(mon->parameters.samplingInterval == sub->publishingInterval ||
172
0
               mon->parameters.samplingInterval == oldPublishingInterval) {
173
0
                UA_MonitoredItem_unregisterSampling(server, mon);
174
0
                UA_MonitoredItem_registerSampling(server, mon);
175
0
            }
176
0
        }
177
0
    }
178
179
    /* If the priority has changed, re-enter the subscription to the
180
     * priority-ordered queue in the session. */
181
0
    if(oldPriority != sub->priority) {
182
0
        UA_Session_detachSubscription(server, session, sub, false);
183
0
        UA_Session_attachSubscription(session, sub);
184
0
    }
185
186
    /* Set the response */
187
0
    response->revisedPublishingInterval = sub->publishingInterval;
188
0
    response->revisedLifetimeCount = sub->lifeTimeCount;
189
0
    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
190
191
    /* Update the diagnostics statistics */
192
0
#ifdef UA_ENABLE_DIAGNOSTICS
193
0
    sub->modifyCount++;
194
0
#endif
195
196
0
    return true;
197
0
}
198
199
static void
200
Operation_SetPublishingMode(UA_Server *server, UA_Session *session,
201
                            const UA_Boolean *publishingEnabled,
202
                            const UA_UInt32 *subscriptionId,
203
0
                            UA_StatusCode *result) {
204
0
    UA_LOCK_ASSERT(&server->serviceMutex);
205
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId);
206
0
    if(!sub) {
207
0
        *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
208
0
        return;
209
0
    }
210
211
    /* Enable/disable */
212
0
    UA_SubscriptionState sState = (*publishingEnabled) ?
213
0
        UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH;
214
0
    *result = Subscription_setState(server, sub, sState);
215
216
    /* Reset the lifetime counter */
217
0
    Subscription_resetLifetime(sub);
218
0
}
219
220
UA_Boolean
221
Service_SetPublishingMode(UA_Server *server, UA_Session *session,
222
                          const UA_SetPublishingModeRequest *request,
223
0
                          UA_SetPublishingModeResponse *response) {
224
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
225
0
                         "Processing SetPublishingModeRequest");
226
0
    UA_LOCK_ASSERT(&server->serviceMutex);
227
228
0
    UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */
229
0
    response->responseHeader.serviceResult =
230
0
        allocProcessServiceOperations(server, session,
231
0
                                      (UA_ServiceOperation)Operation_SetPublishingMode,
232
0
                                      &publishingEnabled, &request->subscriptionIdsSize,
233
0
                                      &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize,
234
0
                                      &UA_TYPES[UA_TYPES_STATUSCODE]);
235
236
0
    return true;
237
0
}
238
239
UA_Boolean
240
Service_Publish(UA_Server *server, UA_Session *session,
241
                const UA_PublishRequest *request,
242
0
                UA_PublishResponse *response) {
243
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
244
0
                         "Processing PublishRequest with RequestId %u",
245
0
                         server->asyncManager.currentRequestId);
246
0
    UA_LOCK_ASSERT(&server->serviceMutex);
247
248
    /* Return an error if the session has no subscription */
249
0
    if(TAILQ_EMPTY(&session->subscriptions)) {
250
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
251
0
        return true;
252
0
    }
253
254
    /* Handle too many subscriptions to free resources before trying to allocate
255
     * resources for the new publish request. If the limit has been reached the
256
     * oldest publish request are returned with an error message. */
257
0
    UA_Session_ensurePublishQueueSpace(server, session);
258
259
    /* Allocate the response to store it in the retransmission queue */
260
0
    UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *)
261
0
        UA_malloc(sizeof(UA_PublishResponseEntry));
262
0
    if(!entry) {
263
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
264
0
        return true;
265
0
    }
266
267
    /* Prepare the response */
268
0
    UA_PublishResponse *entry_response = &entry->response;
269
0
    UA_PublishResponse_init(entry_response);
270
271
    /* Allocate the results array to acknowledge the acknowledge */
272
0
    if(request->subscriptionAcknowledgementsSize > 0) {
273
0
        entry_response->results = (UA_StatusCode *)
274
0
            UA_Array_new(request->subscriptionAcknowledgementsSize,
275
0
                         &UA_TYPES[UA_TYPES_STATUSCODE]);
276
0
        if(!entry_response->results) {
277
0
            UA_free(entry);
278
0
            response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
279
0
            return true;
280
0
        }
281
0
        entry_response->resultsSize = request->subscriptionAcknowledgementsSize;
282
0
    }
283
284
    /* <--- Async response from here on ---> */
285
286
0
    entry->requestId = server->asyncManager.currentRequestId;
287
0
    entry_response->responseHeader.requestHandle = request->requestHeader.requestHandle;
288
289
    /* Delete Acknowledged Subscription Messages */
290
0
    for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) {
291
0
        UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
292
0
        UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId);
293
0
        if(!sub) {
294
0
            entry_response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
295
0
            UA_LOG_DEBUG_SESSION(server->config.logging, session,
296
0
                                 "Cannot process acknowledgements subscription %u" PRIu32,
297
0
                                 ack->subscriptionId);
298
0
            continue;
299
0
        }
300
        /* Remove the acked transmission from the retransmission queue */
301
0
        entry_response->results[i] =
302
0
            UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
303
0
    }
304
305
    /* Set the maxTime if a timeout hint is defined */
306
0
    entry->maxTime = UA_INT64_MAX;
307
0
    if(request->requestHeader.timeoutHint > 0) {
308
0
        UA_EventLoop *el = server->config.eventLoop;
309
0
        entry->maxTime = el->dateTime_nowMonotonic(el) +
310
0
            (request->requestHeader.timeoutHint * UA_DATETIME_MSEC);
311
0
    }
312
313
    /* Queue the publish response. It will be dequeued in a repeated publish
314
     * callback. This can also be triggered right now for a late
315
     * subscription. */
316
0
    UA_Session_queuePublishReq(session, entry, false);
317
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session, "Queued a publication message");
318
319
    /* If there are late subscriptions, the new publish request is used to
320
     * answer them immediately. Late subscriptions with higher priority are
321
     * considered earlier. However, a single subscription that generates many
322
     * notifications must not "starve" other late subscriptions. Hence we move
323
     * it to the end of the queue for the subscriptions of that priority. */
324
0
    UA_Subscription *late, *late_tmp;
325
0
    TAILQ_FOREACH_SAFE(late, &session->subscriptions, sessionListEntry, late_tmp) {
326
        /* Skip non-late subscriptions */
327
0
        if(!late->late)
328
0
            continue;
329
330
        /* Call publish on the late subscription */
331
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, late,
332
0
                                  "Send PublishResponse on a late subscription");
333
0
        UA_Subscription_publish(server, late);
334
335
        /* Skip re-insert if the subscription was deleted or deactivated during
336
         * _publish */
337
0
        if(late->state >= UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH) {
338
            /* Find the first element with smaller priority and insert before
339
             * that. If there is none, insert at the end of the queue. */
340
0
            UA_Subscription *after = TAILQ_NEXT(late, sessionListEntry);
341
0
            while(after && after->priority >= late->priority)
342
0
                after = TAILQ_NEXT(after, sessionListEntry);
343
0
            TAILQ_REMOVE(&session->subscriptions, late, sessionListEntry);
344
0
            if(after)
345
0
                TAILQ_INSERT_BEFORE(after, late, sessionListEntry);
346
0
            else
347
0
                TAILQ_INSERT_TAIL(&session->subscriptions, late, sessionListEntry);
348
0
        }
349
350
        /* Responses left in the queue? */
351
0
        if(session->responseQueueSize == 0)
352
0
            break;
353
0
    }
354
355
0
    return false;
356
0
}
357
358
static void
359
Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_,
360
0
                             const UA_UInt32 *subscriptionId, UA_StatusCode *result) {
361
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId);
362
0
    if(!sub) {
363
0
        *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
364
0
        UA_LOG_DEBUG_SESSION(server->config.logging, session,
365
0
                             "Deleting Subscription with Id %" PRIu32
366
0
                             " failed with error code %s",
367
0
                             *subscriptionId, UA_StatusCode_name(*result));
368
0
        return;
369
0
    }
370
371
0
    UA_Subscription_delete(server, sub);
372
0
    *result = UA_STATUSCODE_GOOD;
373
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
374
0
                         "Subscription %" PRIu32 " | Subscription deleted",
375
0
                         *subscriptionId);
376
0
}
377
378
UA_Boolean
379
Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
380
                            const UA_DeleteSubscriptionsRequest *request,
381
0
                            UA_DeleteSubscriptionsResponse *response) {
382
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
383
0
                         "Processing DeleteSubscriptionsRequest");
384
0
    UA_LOCK_ASSERT(&server->serviceMutex);
385
386
0
    response->responseHeader.serviceResult =
387
0
        allocProcessServiceOperations(server, session,
388
0
                                      (UA_ServiceOperation)Operation_DeleteSubscription,
389
0
                                      NULL, &request->subscriptionIdsSize,
390
0
                                      &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize,
391
0
                                      &UA_TYPES[UA_TYPES_STATUSCODE]);
392
0
    return true;
393
0
}
394
395
UA_Boolean
396
Service_Republish(UA_Server *server, UA_Session *session,
397
                  const UA_RepublishRequest *request,
398
0
                  UA_RepublishResponse *response) {
399
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
400
0
                         "Processing RepublishRequest");
401
0
    UA_LOCK_ASSERT(&server->serviceMutex);
402
403
    /* Get the subscription */
404
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);
405
0
    if(!sub) {
406
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
407
0
        return true;
408
0
    }
409
410
    /* Reset the lifetime counter */
411
0
    Subscription_resetLifetime(sub);
412
413
    /* Update the subscription statistics */
414
0
#ifdef UA_ENABLE_DIAGNOSTICS
415
0
    sub->republishRequestCount++;
416
0
#endif
417
418
    /* Find the notification in the retransmission queue  */
419
0
    UA_NotificationMessageEntry *entry;
420
0
    TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
421
0
        if(entry->message.sequenceNumber == request->retransmitSequenceNumber)
422
0
            break;
423
0
    }
424
0
    if(!entry) {
425
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
426
0
        return true;
427
0
    }
428
429
0
    response->responseHeader.serviceResult =
430
0
        UA_NotificationMessage_copy(&entry->message, &response->notificationMessage);
431
432
    /* Update the subscription statistics for the case where we return a message */
433
0
#ifdef UA_ENABLE_DIAGNOSTICS
434
0
    sub->republishMessageCount++;
435
0
#endif
436
437
0
    return true;
438
0
}
439
440
static UA_StatusCode
441
0
setTransferredSequenceNumbers(const UA_Subscription *sub, UA_TransferResult *result) {
442
    /* Allocate memory */
443
0
    result->availableSequenceNumbers = (UA_UInt32*)
444
0
        UA_Array_new(sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]);
445
0
    if(!result->availableSequenceNumbers)
446
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
447
0
    result->availableSequenceNumbersSize = sub->retransmissionQueueSize;
448
449
    /* Copy over the sequence numbers */
450
0
    UA_NotificationMessageEntry *entry;
451
0
    size_t i = 0;
452
0
    TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
453
0
        result->availableSequenceNumbers[i] = entry->message.sequenceNumber;
454
0
        i++;
455
0
    }
456
457
0
    UA_assert(i == result->availableSequenceNumbersSize);
458
459
0
    return UA_STATUSCODE_GOOD;
460
0
}
461
462
static void
463
Operation_TransferSubscription(UA_Server *server, UA_Session *session,
464
                               const UA_Boolean *sendInitialValues,
465
                               const UA_UInt32 *subscriptionId,
466
0
                               UA_TransferResult *result) {
467
    /* Get the subscription. This requires a server-wide lookup instead of the
468
     * usual session-wide lookup. */
469
0
    UA_Subscription *sub = getSubscriptionById(server, *subscriptionId);
470
0
    if(!sub) {
471
0
        result->statusCode = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
472
0
        return;
473
0
    }
474
475
    /* Update the diagnostics statistics */
476
0
#ifdef UA_ENABLE_DIAGNOSTICS
477
0
    sub->transferRequestCount++;
478
0
#endif
479
480
    /* Is this the same session? Return the sequence numbers and do nothing else. */
481
0
    UA_Session *oldSession = sub->session;
482
0
    if(oldSession == session) {
483
0
        result->statusCode = setTransferredSequenceNumbers(sub, result);
484
0
#ifdef UA_ENABLE_DIAGNOSTICS
485
0
        sub->transferredToSameClientCount++;
486
0
#endif
487
0
        return;
488
0
    }
489
490
    /* Check with AccessControl if the transfer is allowed */
491
0
    if(server->config.accessControl.allowTransferSubscription) {
492
0
        UA_LOCK_ASSERT(&server->serviceMutex);
493
0
        if(!server->config.accessControl.
494
0
           allowTransferSubscription(server, &server->config.accessControl,
495
0
                                     oldSession ? &oldSession->sessionId : NULL,
496
0
                                     oldSession ? oldSession->context : NULL,
497
0
                                     &session->sessionId, session->context)) {
498
0
            result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED;
499
0
            return;
500
0
        }
501
0
    } else {
502
0
        result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED;
503
0
        return;
504
0
    }
505
506
    /* Check limits for the number of subscriptions for this Session */
507
0
    if((server->config.maxSubscriptionsPerSession != 0) &&
508
0
       (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession)) {
509
0
        result->statusCode = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
510
0
        return;
511
0
    }
512
513
    /* Allocate memory for the new subscription */
514
0
    UA_Subscription *newSub = (UA_Subscription*)UA_malloc(sizeof(UA_Subscription));
515
0
    if(!newSub) {
516
0
        result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
517
0
        return;
518
0
    }
519
520
    /* Set the available sequence numbers */
521
0
    result->statusCode = setTransferredSequenceNumbers(sub, result);
522
0
    if(result->statusCode != UA_STATUSCODE_GOOD) {
523
0
        UA_free(newSub);
524
0
        return;
525
0
    }
526
527
    /* Create an identical copy of the Subscription struct. The original
528
     * subscription remains in place until a StatusChange notification has been
529
     * sent. The elements for lists and queues are moved over manually to ensure
530
     * that all backpointers are set correctly. */
531
0
    memcpy(newSub, sub, sizeof(UA_Subscription));
532
533
    /* Set to the same state as the original subscription */
534
0
    newSub->publishCallbackId = 0;
535
0
    result->statusCode = Subscription_setState(server, newSub, sub->state);
536
0
    if(result->statusCode != UA_STATUSCODE_GOOD) {
537
0
        UA_Array_delete(result->availableSequenceNumbers,
538
0
                        sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]);
539
0
        result->availableSequenceNumbers = NULL;
540
0
        result->availableSequenceNumbersSize = 0;
541
0
        UA_free(newSub);
542
0
        return;
543
0
    }
544
545
    /* <-- The point of no return --> */
546
547
    /* Move over the MonitoredItems and adjust the backpointers */
548
0
    LIST_INIT(&newSub->monitoredItems);
549
0
    UA_MonitoredItem *mon, *mon_tmp;
550
0
    LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) {
551
0
        LIST_REMOVE(mon, listEntry);
552
0
        mon->subscription = newSub;
553
0
        LIST_INSERT_HEAD(&newSub->monitoredItems, mon, listEntry);
554
0
    }
555
0
    sub->monitoredItemsSize = 0;
556
557
    /* Move over the notification queue */
558
0
    TAILQ_INIT(&newSub->notificationQueue);
559
0
    UA_Notification *nn, *nn_tmp;
560
0
    TAILQ_FOREACH_SAFE(nn, &sub->notificationQueue, subEntry, nn_tmp) {
561
0
        TAILQ_REMOVE(&sub->notificationQueue, nn, subEntry);
562
0
        TAILQ_INSERT_TAIL(&newSub->notificationQueue, nn, subEntry);
563
0
    }
564
0
    sub->notificationQueueSize = 0;
565
0
    sub->dataChangeNotifications = 0;
566
0
    sub->eventNotifications = 0;
567
568
0
    TAILQ_INIT(&newSub->retransmissionQueue);
569
0
    UA_NotificationMessageEntry *nme, *nme_tmp;
570
0
    TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) {
571
0
        TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
572
0
        TAILQ_INSERT_TAIL(&newSub->retransmissionQueue, nme, listEntry);
573
0
        if(oldSession)
574
0
            oldSession->totalRetransmissionQueueSize -= 1;
575
0
        sub->retransmissionQueueSize -= 1;
576
0
    }
577
0
    UA_assert(sub->retransmissionQueueSize == 0);
578
0
    sub->retransmissionQueueSize = 0;
579
580
    /* Add to the server */
581
0
    UA_assert(newSub->subscriptionId == sub->subscriptionId);
582
0
    LIST_INSERT_HEAD(&server->subscriptions, newSub, serverListEntry);
583
0
    server->subscriptionsSize++;
584
585
    /* Attach to the session */
586
0
    UA_Session_attachSubscription(session, newSub);
587
588
0
    UA_LOG_INFO_SUBSCRIPTION(server->config.logging, newSub, "Transferred to this Session");
589
590
    /* Set StatusChange in the original subscription and force publish. This
591
     * also removes the Subscription, even if there was no PublishResponse
592
     * queued to send a StatusChangeNotification. */
593
0
    sub->statusChange = UA_STATUSCODE_GOODSUBSCRIPTIONTRANSFERRED;
594
0
    UA_Subscription_publish(server, sub);
595
596
    /* Re-create notifications with the current values for the new subscription */
597
0
    if(*sendInitialValues)
598
0
        UA_Subscription_resendData(server, newSub);
599
600
    /* Do not update the statistics for the number of Subscriptions here. The
601
     * fact that we duplicate the subscription and move over the content is just
602
     * an implementtion detail.
603
     * server->serverDiagnosticsSummary.currentSubscriptionCount++;
604
     * server->serverDiagnosticsSummary.cumulatedSubscriptionCount++;
605
     *
606
     * Update the diagnostics statistics: */
607
0
#ifdef UA_ENABLE_DIAGNOSTICS
608
0
    if(oldSession &&
609
0
       UA_equal(&oldSession->clientDescription, &session->clientDescription,
610
0
                &UA_TYPES[UA_TYPES_APPLICATIONDESCRIPTION]))
611
0
        sub->transferredToSameClientCount++;
612
0
    else
613
0
        sub->transferredToAltClientCount++;
614
0
#endif
615
0
}
616
617
UA_Boolean
618
Service_TransferSubscriptions(UA_Server *server, UA_Session *session,
619
                              const UA_TransferSubscriptionsRequest *request,
620
0
                              UA_TransferSubscriptionsResponse *response) {
621
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
622
0
                         "Processing TransferSubscriptionsRequest");
623
0
    UA_LOCK_ASSERT(&server->serviceMutex);
624
625
0
    response->responseHeader.serviceResult =
626
0
        allocProcessServiceOperations(server, session,
627
0
                                      (UA_ServiceOperation)Operation_TransferSubscription,
628
0
                                      &request->sendInitialValues,
629
0
                                      &request->subscriptionIdsSize,
630
0
                                      &UA_TYPES[UA_TYPES_UINT32],
631
0
                                      &response->resultsSize,
632
0
                                      &UA_TYPES[UA_TYPES_TRANSFERRESULT]);
633
0
    return true;
634
0
}
635
636
#endif /* UA_ENABLE_SUBSCRIPTIONS */