Coverage Report

Created: 2026-01-10 06:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541/src/server/ua_services_subscription.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 *    Copyright 2014-2018, 2022 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
6
 *    Copyright 2016-2017 (c) Florian Palm
7
 *    Copyright 2015 (c) Chris Iatrou
8
 *    Copyright 2015-2016 (c) Sten GrĂ¼ner
9
 *    Copyright 2015-2016 (c) Oleksiy Vasylyev
10
 *    Copyright 2017 (c) Stefan Profanter, fortiss GmbH
11
 *    Copyright 2018 (c) Ari Breitkreuz, fortiss GmbH
12
 *    Copyright 2017 (c) Mattias Bornhager
13
 *    Copyright 2017 (c) Henrik Norrman
14
 *    Copyright 2017-2018 (c) Thomas Stalder, Blue Time Concept SA
15
 *    Copyright 2018 (c) Fabian Arndt, Root-Core
16
 *    Copyright 2017-2019 (c) HMS Industrial Networks AB (Author: Jonas Green)
17
 */
18
19
#include "ua_server_internal.h"
20
#include "ua_services.h"
21
#include "ua_subscription.h"
22
23
#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */
24
25
static void
26
setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
27
                        UA_Double requestedPublishingInterval,
28
                        UA_UInt32 requestedLifetimeCount,
29
                        UA_UInt32 requestedMaxKeepAliveCount,
30
                        UA_UInt32 maxNotificationsPerPublish,
31
0
                        UA_Byte priority) {
32
0
    UA_LOCK_ASSERT(&server->serviceMutex);
33
34
    /* re-parameterize the subscription */
35
0
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.publishingIntervalLimits,
36
0
                               requestedPublishingInterval,
37
0
                               subscription->publishingInterval);
38
    /* check for nan*/
39
0
    if(requestedPublishingInterval != requestedPublishingInterval)
40
0
        subscription->publishingInterval = server->config.publishingIntervalLimits.min;
41
0
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.keepAliveCountLimits,
42
0
                               requestedMaxKeepAliveCount, subscription->maxKeepAliveCount);
43
0
    UA_BOUNDEDVALUE_SETWBOUNDS(server->config.lifeTimeCountLimits,
44
0
                               requestedLifetimeCount, subscription->lifeTimeCount);
45
0
    if(subscription->lifeTimeCount < 3 * subscription->maxKeepAliveCount)
46
0
        subscription->lifeTimeCount = 3 * subscription->maxKeepAliveCount;
47
0
    subscription->notificationsPerPublish = maxNotificationsPerPublish;
48
0
    if(maxNotificationsPerPublish == 0 ||
49
0
       maxNotificationsPerPublish > server->config.maxNotificationsPerPublish)
50
0
        subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
51
0
    subscription->priority = priority;
52
0
}
53
54
static void
55
notifySubscription(UA_Server *server, UA_Subscription *sub,
56
0
                   UA_ApplicationNotificationType type) {
57
0
    if(!server->config.subscriptionNotificationCallback &&
58
0
       !server->config.globalNotificationCallback)
59
0
        return;
60
61
0
    static UA_THREAD_LOCAL UA_KeyValuePair createSubData[8] = {
62
0
        {{0, UA_STRING_STATIC("session-id")}, {0}},
63
0
        {{0, UA_STRING_STATIC("subscription-id")}, {0}},
64
0
        {{0, UA_STRING_STATIC("publishing-interval")}, {0}},
65
0
        {{0, UA_STRING_STATIC("lifetime-count")}, {0}},
66
0
        {{0, UA_STRING_STATIC("max-keepalive-count")}, {0}},
67
0
        {{0, UA_STRING_STATIC("max-notifications-per-publish")}, {0}},
68
0
        {{0, UA_STRING_STATIC("priority")}, {0}},
69
0
        {{0, UA_STRING_STATIC("publishing-enabled")}, {0}}
70
0
    };
71
0
    UA_KeyValueMap createSubMap = {8, createSubData};
72
73
0
    UA_NodeId sessionId = (sub->session) ? sub->session->sessionId : UA_NODEID_NULL;
74
0
    UA_Boolean enabled = (sub->state = UA_SUBSCRIPTIONSTATE_ENABLED);
75
76
0
    UA_Variant_setScalar(&createSubData[0].value, &sessionId,
77
0
                         &UA_TYPES[UA_TYPES_NODEID]);
78
0
    UA_Variant_setScalar(&createSubData[1].value, &sub->subscriptionId,
79
0
                         &UA_TYPES[UA_TYPES_UINT32]);
80
0
    UA_Variant_setScalar(&createSubData[2].value, &sub->publishingInterval,
81
0
                         &UA_TYPES[UA_TYPES_DOUBLE]);
82
0
    UA_Variant_setScalar(&createSubData[3].value, &sub->lifeTimeCount,
83
0
                         &UA_TYPES[UA_TYPES_UINT32]);
84
0
    UA_Variant_setScalar(&createSubData[4].value, &sub->maxKeepAliveCount,
85
0
                         &UA_TYPES[UA_TYPES_UINT32]);
86
0
    UA_Variant_setScalar(&createSubData[5].value, &sub->notificationsPerPublish,
87
0
                         &UA_TYPES[UA_TYPES_UINT32]);
88
0
    UA_Variant_setScalar(&createSubData[6].value, &sub->priority,
89
0
                         &UA_TYPES[UA_TYPES_BYTE]);
90
0
    UA_Variant_setScalar(&createSubData[7].value, &enabled,
91
0
                         &UA_TYPES[UA_TYPES_BOOLEAN]);
92
93
0
    if(server->config.subscriptionNotificationCallback)
94
0
        server->config.subscriptionNotificationCallback(server, type, createSubMap);
95
0
    if(server->config.globalNotificationCallback)
96
0
        server->config.globalNotificationCallback(server, type, createSubMap);
97
0
}
98
99
UA_Boolean
100
Service_CreateSubscription(UA_Server *server, UA_Session *session,
101
                           const UA_CreateSubscriptionRequest *request,
102
0
                           UA_CreateSubscriptionResponse *response) {
103
0
    UA_LOCK_ASSERT(&server->serviceMutex);
104
105
    /* Check limits for the number of subscriptions */
106
0
    if(((server->config.maxSubscriptions != 0) &&
107
0
        (server->subscriptionsSize >= server->config.maxSubscriptions)) ||
108
0
       ((server->config.maxSubscriptionsPerSession != 0) &&
109
0
        (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession))) {
110
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
111
0
        return true;
112
0
    }
113
114
    /* Create the subscription */
115
0
    UA_Subscription *sub = UA_Subscription_new();
116
0
    if(!sub) {
117
0
        UA_LOG_DEBUG_SESSION(server->config.logging, session,
118
0
                             "Processing CreateSubscriptionRequest failed");
119
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
120
0
        return true;
121
0
    }
122
123
    /* Set the subscription parameters */
124
0
    setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
125
0
                            request->requestedLifetimeCount,
126
0
                            request->requestedMaxKeepAliveCount,
127
0
                            request->maxNotificationsPerPublish, request->priority);
128
0
    sub->subscriptionId = ++server->lastSubscriptionId;  /* Assign the SubscriptionId */
129
130
    /* Register the subscription in the server */
131
0
    LIST_INSERT_HEAD(&server->subscriptions, sub, serverListEntry);
132
0
    server->subscriptionsSize++;
133
134
    /* Update the server statistics */
135
0
    server->serverDiagnosticsSummary.currentSubscriptionCount++;
136
0
    server->serverDiagnosticsSummary.cumulatedSubscriptionCount++;
137
138
    /* Attach the Subscription to the session */
139
0
    UA_Session_attachSubscription(session, sub);
140
141
    /* Create representation in the Session object */
142
0
#ifdef UA_ENABLE_DIAGNOSTICS
143
0
    createSubscriptionObject(server, session, sub);
144
0
#endif
145
146
    /* Set the subscription state. This also registers the callback.
147
     * Note that also a disabled subscription publishes keepalives. */
148
0
    UA_SubscriptionState sState = (request->publishingEnabled) ?
149
0
        UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH;
150
0
    UA_StatusCode res = Subscription_setState(server, sub, sState);
151
0
    if(res != UA_STATUSCODE_GOOD) {
152
0
        UA_LOG_DEBUG_SESSION(server->config.logging, sub->session,
153
0
                             "Subscription %" PRIu32 " | Could not register "
154
0
                             "publish callback with error code %s",
155
0
                             sub->subscriptionId, UA_StatusCode_name(res));
156
0
        response->responseHeader.serviceResult = res;
157
0
        UA_Subscription_delete(server, sub);
158
0
        return true;
159
0
    }
160
161
0
    UA_LOG_INFO_SUBSCRIPTION(server->config.logging, sub,
162
0
                             "Subscription created (Publishing interval %.2fms, "
163
0
                             "max %lu notifications per publish)",
164
0
                             sub->publishingInterval,
165
0
                             (long unsigned)sub->notificationsPerPublish);
166
167
    /* Notify the application */
168
0
    notifySubscription(server, sub,
169
0
                       UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_CREATED);
170
171
    /* Prepare the response */
172
0
    response->subscriptionId = sub->subscriptionId;
173
0
    response->revisedPublishingInterval = sub->publishingInterval;
174
0
    response->revisedLifetimeCount = sub->lifeTimeCount;
175
0
    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
176
177
0
    return true;
178
0
}
179
180
UA_Boolean
181
Service_ModifySubscription(UA_Server *server, UA_Session *session,
182
                           const UA_ModifySubscriptionRequest *request,
183
0
                           UA_ModifySubscriptionResponse *response) {
184
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
185
0
                         "Processing ModifySubscriptionRequest");
186
0
    UA_LOCK_ASSERT(&server->serviceMutex);
187
188
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);
189
0
    if(!sub) {
190
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
191
0
        return true;
192
0
    }
193
194
    /* Store the old publishing interval */
195
0
    UA_Double oldPublishingInterval = sub->publishingInterval;
196
0
    UA_Byte oldPriority = sub->priority;
197
198
    /* Change the Subscription settings */
199
0
    setSubscriptionSettings(server, sub, request->requestedPublishingInterval,
200
0
                            request->requestedLifetimeCount,
201
0
                            request->requestedMaxKeepAliveCount,
202
0
                            request->maxNotificationsPerPublish, request->priority);
203
204
    /* Reset the subscription lifetime */
205
0
    Subscription_resetLifetime(sub);
206
207
    /* The publish interval has changed */
208
0
    if(sub->publishingInterval != oldPublishingInterval) {
209
        /* Change the repeated callback to the new interval. This cannot fail as
210
         * memory is reused. */
211
0
        if(sub->publishCallbackId > 0)
212
0
            changeRepeatedCallbackInterval(server, sub->publishCallbackId,
213
0
                                           sub->publishingInterval);
214
215
        /* For each MonitoredItem check if it was/shall be attached to the
216
         * publish interval. This ensures that we have less cyclic callbacks
217
         * registered and that the notifications are fresh. */
218
0
        UA_MonitoredItem *mon;
219
0
        LIST_FOREACH(mon, &sub->monitoredItems, listEntry) {
220
0
            if(mon->parameters.samplingInterval == sub->publishingInterval ||
221
0
               mon->parameters.samplingInterval == oldPublishingInterval) {
222
0
                UA_MonitoredItem_unregisterSampling(server, mon);
223
0
                UA_MonitoredItem_registerSampling(server, mon);
224
0
            }
225
0
        }
226
0
    }
227
228
    /* If the priority has changed, re-enter the subscription to the
229
     * priority-ordered queue in the session. */
230
0
    if(oldPriority != sub->priority) {
231
0
        UA_Session_detachSubscription(server, session, sub, false);
232
0
        UA_Session_attachSubscription(session, sub);
233
0
    }
234
235
    /* Notify the application */
236
0
    notifySubscription(server, sub,
237
0
                       UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_MODIFIED);
238
239
    /* Set the response */
240
0
    response->revisedPublishingInterval = sub->publishingInterval;
241
0
    response->revisedLifetimeCount = sub->lifeTimeCount;
242
0
    response->revisedMaxKeepAliveCount = sub->maxKeepAliveCount;
243
244
    /* Update the diagnostics statistics */
245
0
#ifdef UA_ENABLE_DIAGNOSTICS
246
0
    sub->modifyCount++;
247
0
#endif
248
249
0
    return true;
250
0
}
251
252
static void
253
Operation_SetPublishingMode(UA_Server *server, UA_Session *session,
254
                            const UA_Boolean *publishingEnabled,
255
                            const UA_UInt32 *subscriptionId,
256
0
                            UA_StatusCode *result) {
257
0
    UA_LOCK_ASSERT(&server->serviceMutex);
258
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId);
259
0
    if(!sub) {
260
0
        *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
261
0
        return;
262
0
    }
263
264
    /* Enable/disable */
265
0
    UA_SubscriptionState sState = (*publishingEnabled) ?
266
0
        UA_SUBSCRIPTIONSTATE_ENABLED : UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH;
267
0
    *result = Subscription_setState(server, sub, sState);
268
269
    /* Reset the lifetime counter */
270
0
    Subscription_resetLifetime(sub);
271
272
    /* Notify the application */
273
0
    notifySubscription(server, sub,
274
0
                       UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_PUBLISHINGMODE);
275
0
}
276
277
UA_Boolean
278
Service_SetPublishingMode(UA_Server *server, UA_Session *session,
279
                          const UA_SetPublishingModeRequest *request,
280
0
                          UA_SetPublishingModeResponse *response) {
281
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
282
0
                         "Processing SetPublishingModeRequest");
283
0
    UA_LOCK_ASSERT(&server->serviceMutex);
284
285
0
    UA_Boolean publishingEnabled = request->publishingEnabled; /* request is const */
286
0
    response->responseHeader.serviceResult =
287
0
        allocProcessServiceOperations(server, session,
288
0
                                      (UA_ServiceOperation)Operation_SetPublishingMode,
289
0
                                      &publishingEnabled, &request->subscriptionIdsSize,
290
0
                                      &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize,
291
0
                                      &UA_TYPES[UA_TYPES_STATUSCODE]);
292
293
0
    return true;
294
0
}
295
296
UA_Boolean
297
Service_Publish(UA_Server *server, UA_Session *session,
298
                const UA_PublishRequest *request,
299
0
                UA_PublishResponse *response) {
300
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
301
0
                         "Processing PublishRequest with RequestId %u",
302
0
                         server->asyncManager.currentRequestId);
303
0
    UA_LOCK_ASSERT(&server->serviceMutex);
304
305
    /* Return an error if the session has no subscription */
306
0
    if(TAILQ_EMPTY(&session->subscriptions)) {
307
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADNOSUBSCRIPTION;
308
0
        return true;
309
0
    }
310
311
    /* Handle too many subscriptions to free resources before trying to allocate
312
     * resources for the new publish request. If the limit has been reached the
313
     * oldest publish request are returned with an error message. */
314
0
    UA_Session_ensurePublishQueueSpace(server, session);
315
316
    /* Allocate the response to store it in the retransmission queue */
317
0
    UA_PublishResponseEntry *entry = (UA_PublishResponseEntry *)
318
0
        UA_malloc(sizeof(UA_PublishResponseEntry));
319
0
    if(!entry) {
320
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
321
0
        return true;
322
0
    }
323
324
    /* Prepare the response */
325
0
    UA_PublishResponse *entry_response = &entry->response;
326
0
    UA_PublishResponse_init(entry_response);
327
328
    /* Allocate the results array to acknowledge the acknowledge */
329
0
    if(request->subscriptionAcknowledgementsSize > 0) {
330
0
        entry_response->results = (UA_StatusCode *)
331
0
            UA_Array_new(request->subscriptionAcknowledgementsSize,
332
0
                         &UA_TYPES[UA_TYPES_STATUSCODE]);
333
0
        if(!entry_response->results) {
334
0
            UA_free(entry);
335
0
            response->responseHeader.serviceResult = UA_STATUSCODE_BADOUTOFMEMORY;
336
0
            return true;
337
0
        }
338
0
        entry_response->resultsSize = request->subscriptionAcknowledgementsSize;
339
0
    }
340
341
    /* <--- Async response from here on ---> */
342
343
0
    entry->requestId = server->asyncManager.currentRequestId;
344
0
    entry_response->responseHeader.requestHandle = request->requestHeader.requestHandle;
345
346
    /* Delete Acknowledged Subscription Messages */
347
0
    for(size_t i = 0; i < request->subscriptionAcknowledgementsSize; ++i) {
348
0
        UA_SubscriptionAcknowledgement *ack = &request->subscriptionAcknowledgements[i];
349
0
        UA_Subscription *sub = UA_Session_getSubscriptionById(session, ack->subscriptionId);
350
0
        if(!sub) {
351
0
            entry_response->results[i] = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
352
0
            UA_LOG_DEBUG_SESSION(server->config.logging, session,
353
0
                                 "Cannot process acknowledgements subscription %u" PRIu32,
354
0
                                 ack->subscriptionId);
355
0
            continue;
356
0
        }
357
        /* Remove the acked transmission from the retransmission queue */
358
0
        entry_response->results[i] =
359
0
            UA_Subscription_removeRetransmissionMessage(sub, ack->sequenceNumber);
360
0
    }
361
362
    /* Set the maxTime if a timeout hint is defined */
363
0
    entry->maxTime = UA_INT64_MAX;
364
0
    if(request->requestHeader.timeoutHint > 0) {
365
0
        UA_EventLoop *el = server->config.eventLoop;
366
0
        entry->maxTime = el->dateTime_nowMonotonic(el) +
367
0
            (request->requestHeader.timeoutHint * UA_DATETIME_MSEC);
368
0
    }
369
370
    /* Queue the publish response. It will be dequeued in a repeated publish
371
     * callback. This can also be triggered right now for a late
372
     * subscription. */
373
0
    UA_Session_queuePublishReq(session, entry, false);
374
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session, "Queued a publication message");
375
376
    /* If there are late subscriptions, the new publish request is used to
377
     * answer them immediately. Late subscriptions with higher priority are
378
     * considered earlier. However, a single subscription that generates many
379
     * notifications must not "starve" other late subscriptions. Hence we move
380
     * it to the end of the queue for the subscriptions of that priority. */
381
0
    UA_Subscription *late, *late_tmp;
382
0
    TAILQ_FOREACH_SAFE(late, &session->subscriptions, sessionListEntry, late_tmp) {
383
        /* Skip non-late subscriptions */
384
0
        if(!late->late)
385
0
            continue;
386
387
        /* Call publish on the late subscription */
388
0
        UA_LOG_DEBUG_SUBSCRIPTION(server->config.logging, late,
389
0
                                  "Send PublishResponse on a late subscription");
390
0
        UA_Subscription_publish(server, late);
391
392
        /* Skip re-insert if the subscription was deleted or deactivated during
393
         * _publish */
394
0
        if(late->state >= UA_SUBSCRIPTIONSTATE_ENABLED_NOPUBLISH) {
395
            /* Find the first element with smaller priority and insert before
396
             * that. If there is none, insert at the end of the queue. */
397
0
            UA_Subscription *after = TAILQ_NEXT(late, sessionListEntry);
398
0
            while(after && after->priority >= late->priority)
399
0
                after = TAILQ_NEXT(after, sessionListEntry);
400
0
            TAILQ_REMOVE(&session->subscriptions, late, sessionListEntry);
401
0
            if(after)
402
0
                TAILQ_INSERT_BEFORE(after, late, sessionListEntry);
403
0
            else
404
0
                TAILQ_INSERT_TAIL(&session->subscriptions, late, sessionListEntry);
405
0
        }
406
407
        /* Responses left in the queue? */
408
0
        if(session->responseQueueSize == 0)
409
0
            break;
410
0
    }
411
412
0
    return false;
413
0
}
414
415
static void
416
Operation_DeleteSubscription(UA_Server *server, UA_Session *session, void *_,
417
0
                             const UA_UInt32 *subscriptionId, UA_StatusCode *result) {
418
    /* Find the Subscription */
419
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, *subscriptionId);
420
0
    if(!sub) {
421
0
        *result = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
422
0
        UA_LOG_DEBUG_SESSION(server->config.logging, session,
423
0
                             "Deleting Subscription with Id %" PRIu32
424
0
                             " failed with error code %s",
425
0
                             *subscriptionId, UA_StatusCode_name(*result));
426
0
        return;
427
0
    }
428
429
    /* Notify the application */
430
0
    notifySubscription(server, sub,
431
0
                       UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_DELETED);
432
433
    /* Delete the Subscription */
434
0
    UA_Subscription_delete(server, sub);
435
0
    *result = UA_STATUSCODE_GOOD;
436
437
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
438
0
                         "Subscription %" PRIu32 " | Subscription deleted",
439
0
                         *subscriptionId);
440
0
}
441
442
UA_Boolean
443
Service_DeleteSubscriptions(UA_Server *server, UA_Session *session,
444
                            const UA_DeleteSubscriptionsRequest *request,
445
0
                            UA_DeleteSubscriptionsResponse *response) {
446
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
447
0
                         "Processing DeleteSubscriptionsRequest");
448
0
    UA_LOCK_ASSERT(&server->serviceMutex);
449
450
0
    response->responseHeader.serviceResult =
451
0
        allocProcessServiceOperations(server, session,
452
0
                                      (UA_ServiceOperation)Operation_DeleteSubscription,
453
0
                                      NULL, &request->subscriptionIdsSize,
454
0
                                      &UA_TYPES[UA_TYPES_UINT32], &response->resultsSize,
455
0
                                      &UA_TYPES[UA_TYPES_STATUSCODE]);
456
0
    return true;
457
0
}
458
459
UA_Boolean
460
Service_Republish(UA_Server *server, UA_Session *session,
461
                  const UA_RepublishRequest *request,
462
0
                  UA_RepublishResponse *response) {
463
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
464
0
                         "Processing RepublishRequest");
465
0
    UA_LOCK_ASSERT(&server->serviceMutex);
466
467
    /* Get the subscription */
468
0
    UA_Subscription *sub = UA_Session_getSubscriptionById(session, request->subscriptionId);
469
0
    if(!sub) {
470
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
471
0
        return true;
472
0
    }
473
474
    /* Reset the lifetime counter */
475
0
    Subscription_resetLifetime(sub);
476
477
    /* Update the subscription statistics */
478
0
#ifdef UA_ENABLE_DIAGNOSTICS
479
0
    sub->republishRequestCount++;
480
0
#endif
481
482
    /* Find the notification in the retransmission queue  */
483
0
    UA_NotificationMessageEntry *entry;
484
0
    TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
485
0
        if(entry->message.sequenceNumber == request->retransmitSequenceNumber)
486
0
            break;
487
0
    }
488
0
    if(!entry) {
489
0
        response->responseHeader.serviceResult = UA_STATUSCODE_BADMESSAGENOTAVAILABLE;
490
0
        return true;
491
0
    }
492
493
0
    response->responseHeader.serviceResult =
494
0
        UA_NotificationMessage_copy(&entry->message, &response->notificationMessage);
495
496
    /* Update the subscription statistics for the case where we return a message */
497
0
#ifdef UA_ENABLE_DIAGNOSTICS
498
0
    sub->republishMessageCount++;
499
0
#endif
500
501
0
    return true;
502
0
}
503
504
static UA_StatusCode
505
0
setTransferredSequenceNumbers(const UA_Subscription *sub, UA_TransferResult *result) {
506
    /* Allocate memory */
507
0
    result->availableSequenceNumbers = (UA_UInt32*)
508
0
        UA_Array_new(sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]);
509
0
    if(!result->availableSequenceNumbers)
510
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
511
0
    result->availableSequenceNumbersSize = sub->retransmissionQueueSize;
512
513
    /* Copy over the sequence numbers */
514
0
    UA_NotificationMessageEntry *entry;
515
0
    size_t i = 0;
516
0
    TAILQ_FOREACH(entry, &sub->retransmissionQueue, listEntry) {
517
0
        result->availableSequenceNumbers[i] = entry->message.sequenceNumber;
518
0
        i++;
519
0
    }
520
521
0
    UA_assert(i == result->availableSequenceNumbersSize);
522
523
0
    return UA_STATUSCODE_GOOD;
524
0
}
525
526
static void
527
Operation_TransferSubscription(UA_Server *server, UA_Session *session,
528
                               const UA_Boolean *sendInitialValues,
529
                               const UA_UInt32 *subscriptionId,
530
0
                               UA_TransferResult *result) {
531
0
    UA_LOCK_ASSERT(&server->serviceMutex);
532
533
    /* Get the subscription. This requires a server-wide lookup instead of the
534
     * usual session-wide lookup. */
535
0
    UA_Subscription *sub = getSubscriptionById(server, *subscriptionId);
536
0
    if(!sub) {
537
0
        result->statusCode = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
538
0
        return;
539
0
    }
540
541
    /* Update the diagnostics statistics */
542
0
#ifdef UA_ENABLE_DIAGNOSTICS
543
0
    sub->transferRequestCount++;
544
0
#endif
545
546
    /* Is this the same session? Return the sequence numbers and do nothing else. */
547
0
    UA_Session *oldSession = sub->session;
548
0
    if(oldSession == session) {
549
0
        result->statusCode = setTransferredSequenceNumbers(sub, result);
550
0
#ifdef UA_ENABLE_DIAGNOSTICS
551
0
        sub->transferredToSameClientCount++;
552
0
#endif
553
0
        return;
554
0
    }
555
556
    /* Check with AccessControl if the transfer is allowed */
557
0
    if(server->config.accessControl.allowTransferSubscription) {
558
0
        if(!server->config.accessControl.
559
0
           allowTransferSubscription(server, &server->config.accessControl,
560
0
                                     oldSession ? &oldSession->sessionId : NULL,
561
0
                                     oldSession ? oldSession->context : NULL,
562
0
                                     &session->sessionId, session->context)) {
563
0
            result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED;
564
0
            return;
565
0
        }
566
0
    } else {
567
0
        result->statusCode = UA_STATUSCODE_BADUSERACCESSDENIED;
568
0
        return;
569
0
    }
570
571
    /* Check limits for the number of subscriptions for this Session */
572
0
    if((server->config.maxSubscriptionsPerSession != 0) &&
573
0
       (session->subscriptionsSize >= server->config.maxSubscriptionsPerSession)) {
574
0
        result->statusCode = UA_STATUSCODE_BADTOOMANYSUBSCRIPTIONS;
575
0
        return;
576
0
    }
577
578
    /* Allocate memory for the new subscription */
579
0
    UA_Subscription *newSub = (UA_Subscription*)UA_malloc(sizeof(UA_Subscription));
580
0
    if(!newSub) {
581
0
        result->statusCode = UA_STATUSCODE_BADOUTOFMEMORY;
582
0
        return;
583
0
    }
584
585
    /* Set the available sequence numbers */
586
0
    result->statusCode = setTransferredSequenceNumbers(sub, result);
587
0
    if(result->statusCode != UA_STATUSCODE_GOOD) {
588
0
        UA_free(newSub);
589
0
        return;
590
0
    }
591
592
    /* Create an identical copy of the Subscription struct. The original
593
     * subscription remains in place until a StatusChange notification has been
594
     * sent. The elements for lists and queues are moved over manually to ensure
595
     * that all backpointers are set correctly. */
596
0
    memcpy(newSub, sub, sizeof(UA_Subscription));
597
598
    /* Set to the same state as the original subscription */
599
0
    newSub->publishCallbackId = 0;
600
0
    result->statusCode = Subscription_setState(server, newSub, sub->state);
601
0
    if(result->statusCode != UA_STATUSCODE_GOOD) {
602
0
        UA_Array_delete(result->availableSequenceNumbers,
603
0
                        sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]);
604
0
        result->availableSequenceNumbers = NULL;
605
0
        result->availableSequenceNumbersSize = 0;
606
0
        UA_free(newSub);
607
0
        return;
608
0
    }
609
610
    /* <-- The point of no return --> */
611
612
    /* Move over the MonitoredItems and adjust the backpointers */
613
0
    LIST_INIT(&newSub->monitoredItems);
614
0
    UA_MonitoredItem *mon, *mon_tmp;
615
0
    LIST_FOREACH_SAFE(mon, &sub->monitoredItems, listEntry, mon_tmp) {
616
0
        LIST_REMOVE(mon, listEntry);
617
0
        mon->subscription = newSub;
618
0
        LIST_INSERT_HEAD(&newSub->monitoredItems, mon, listEntry);
619
0
    }
620
0
    sub->monitoredItemsSize = 0;
621
622
    /* Move over the notification queue */
623
0
    TAILQ_INIT(&newSub->notificationQueue);
624
0
    UA_Notification *nn, *nn_tmp;
625
0
    TAILQ_FOREACH_SAFE(nn, &sub->notificationQueue, subEntry, nn_tmp) {
626
0
        TAILQ_REMOVE(&sub->notificationQueue, nn, subEntry);
627
0
        TAILQ_INSERT_TAIL(&newSub->notificationQueue, nn, subEntry);
628
0
    }
629
0
    sub->notificationQueueSize = 0;
630
0
    sub->dataChangeNotifications = 0;
631
0
    sub->eventNotifications = 0;
632
633
0
    TAILQ_INIT(&newSub->retransmissionQueue);
634
0
    UA_NotificationMessageEntry *nme, *nme_tmp;
635
0
    TAILQ_FOREACH_SAFE(nme, &sub->retransmissionQueue, listEntry, nme_tmp) {
636
0
        TAILQ_REMOVE(&sub->retransmissionQueue, nme, listEntry);
637
0
        TAILQ_INSERT_TAIL(&newSub->retransmissionQueue, nme, listEntry);
638
0
        if(oldSession)
639
0
            oldSession->totalRetransmissionQueueSize -= 1;
640
0
        sub->retransmissionQueueSize -= 1;
641
0
    }
642
0
    UA_assert(sub->retransmissionQueueSize == 0);
643
0
    sub->retransmissionQueueSize = 0;
644
645
    /* Add to the server */
646
0
    UA_assert(newSub->subscriptionId == sub->subscriptionId);
647
0
    LIST_INSERT_HEAD(&server->subscriptions, newSub, serverListEntry);
648
0
    server->subscriptionsSize++;
649
650
    /* Attach to the session */
651
0
    UA_Session_attachSubscription(session, newSub);
652
653
    /* Notify the application */
654
0
    notifySubscription(server, newSub,
655
0
                       UA_APPLICATIONNOTIFICATIONTYPE_SUBSCRIPTION_TRANSFERRED);
656
657
0
    UA_LOG_INFO_SUBSCRIPTION(server->config.logging, newSub,
658
0
                             "Transferred to this Session");
659
660
    /* Set StatusChange in the original subscription and force publish. This
661
     * also removes the Subscription, even if there was no PublishResponse
662
     * queued to send a StatusChangeNotification. */
663
0
    sub->statusChange = UA_STATUSCODE_GOODSUBSCRIPTIONTRANSFERRED;
664
0
    UA_Subscription_publish(server, sub);
665
666
    /* Re-create notifications with the current values for the new subscription */
667
0
    if(*sendInitialValues)
668
0
        UA_Subscription_resendData(server, newSub);
669
670
    /* Do not update the statistics for the number of Subscriptions here. The
671
     * fact that we duplicate the subscription and move over the content is just
672
     * an implementtion detail.
673
     * server->serverDiagnosticsSummary.currentSubscriptionCount++;
674
     * server->serverDiagnosticsSummary.cumulatedSubscriptionCount++;
675
     *
676
     * Update the diagnostics statistics: */
677
0
#ifdef UA_ENABLE_DIAGNOSTICS
678
0
    if(oldSession &&
679
0
       UA_equal(&oldSession->clientDescription, &session->clientDescription,
680
0
                &UA_TYPES[UA_TYPES_APPLICATIONDESCRIPTION]))
681
0
        sub->transferredToSameClientCount++;
682
0
    else
683
0
        sub->transferredToAltClientCount++;
684
0
#endif
685
0
}
686
687
UA_Boolean
688
Service_TransferSubscriptions(UA_Server *server, UA_Session *session,
689
                              const UA_TransferSubscriptionsRequest *request,
690
0
                              UA_TransferSubscriptionsResponse *response) {
691
0
    UA_LOG_DEBUG_SESSION(server->config.logging, session,
692
0
                         "Processing TransferSubscriptionsRequest");
693
0
    UA_LOCK_ASSERT(&server->serviceMutex);
694
695
0
    response->responseHeader.serviceResult =
696
0
        allocProcessServiceOperations(server, session,
697
0
                                      (UA_ServiceOperation)Operation_TransferSubscription,
698
0
                                      &request->sendInitialValues,
699
0
                                      &request->subscriptionIdsSize,
700
0
                                      &UA_TYPES[UA_TYPES_UINT32],
701
0
                                      &response->resultsSize,
702
0
                                      &UA_TYPES[UA_TYPES_TRANSFERRESULT]);
703
    return true;
704
0
}
705
706
#endif /* UA_ENABLE_SUBSCRIPTIONS */