Coverage Report

Created: 2025-08-26 06:30

/src/open62541/src/pubsub/ua_pubsub_manager.c
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 * Copyright (c) 2017-2025 Fraunhofer IOSB (Author: Andreas Ebner)
6
 * Copyright (c) 2018 Fraunhofer IOSB (Author: Julius Pfrommer)
7
 * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes)
8
 * Copyright (c) 2022 Siemens AG (Author: Thomas Fischer)
9
 * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf)
10
 * Copyright (c) 2022 Linutronix GmbH (Author: Muddasir Shakil)
11
 */
12
13
#include "ua_pubsub_internal.h"
14
15
#ifdef UA_ENABLE_PUBSUB /* conditional compilation */
16
17
#ifdef UA_ENABLE_PUBSUB_SKS
18
#include "ua_pubsub_keystorage.h"
19
#endif
20
21
0
#define UA_DATETIMESTAMP_2000 125911584000000000
22
0
#define UA_RESERVEID_FIRST_ID 0x8000
23
24
static const char *pubSubStateNames[6] = {
25
    "Disabled", "Paused", "Operational", "Error", "PreOperational", "Invalid"
26
};
27
28
static void
29
UA_PubSubManager_stop(UA_ServerComponent *sc);
30
31
static UA_StatusCode
32
UA_PubSubManager_start(UA_ServerComponent *sc, UA_Server *server);
33
34
const char *
35
0
UA_PubSubState_name(UA_PubSubState state) {
36
0
    if(state < UA_PUBSUBSTATE_DISABLED || state > UA_PUBSUBSTATE_PREOPERATIONAL)
37
0
        return pubSubStateNames[5];
38
0
    return pubSubStateNames[state];
39
0
}
40
41
void
42
0
UA_PubSubComponentHead_clear(UA_PubSubComponentHead *psch) {
43
0
    UA_NodeId_clear(&psch->identifier);
44
0
    UA_String_clear(&psch->logIdString);
45
0
    memset(psch, 0, sizeof(UA_PubSubComponentHead));
46
0
}
47
48
UA_StatusCode
49
UA_PublisherId_copy(const UA_PublisherId *src,
50
0
                    UA_PublisherId *dst) {
51
0
    memcpy(dst, src, sizeof(UA_PublisherId));
52
0
    if(src->idType == UA_PUBLISHERIDTYPE_STRING)
53
0
        return UA_String_copy(&src->id.string, &dst->id.string);
54
0
    return UA_STATUSCODE_GOOD;
55
0
}
56
57
void
58
0
UA_PublisherId_clear(UA_PublisherId *p) {
59
0
    if(p->idType == UA_PUBLISHERIDTYPE_STRING)
60
0
        UA_String_clear(&p->id.string);
61
0
    memset(p, 0, sizeof(UA_PublisherId));
62
0
}
63
64
UA_StatusCode
65
0
UA_PublisherId_fromVariant(UA_PublisherId *p, const UA_Variant *src) {
66
0
    if(!UA_Variant_isScalar(src))
67
0
        return UA_STATUSCODE_BADINTERNALERROR;
68
69
0
    memset(p, 0, sizeof(UA_PublisherId));
70
71
0
    const void *data = (const void*)src->data;
72
0
    if(src->type == &UA_TYPES[UA_TYPES_BYTE]) {
73
0
        p->idType = UA_PUBLISHERIDTYPE_BYTE;
74
0
        p->id.byte = *(const UA_Byte*)data;
75
0
    } else if(src->type == &UA_TYPES[UA_TYPES_UINT16]) {
76
0
        p->idType  = UA_PUBLISHERIDTYPE_UINT16;
77
0
        p->id.uint16 = *(const UA_UInt16*)data;
78
0
    } else if(src->type == &UA_TYPES[UA_TYPES_UINT32]) {
79
0
        p->idType  = UA_PUBLISHERIDTYPE_UINT32;
80
0
        p->id.uint32 = *(const UA_UInt32*)data;
81
0
    } else if(src->type == &UA_TYPES[UA_TYPES_UINT64]) {
82
0
        p->idType  = UA_PUBLISHERIDTYPE_UINT64;
83
0
        p->id.uint64 = *(const UA_UInt64*)data;
84
0
    } else if(src->type == &UA_TYPES[UA_TYPES_STRING]) {
85
0
        p->idType  = UA_PUBLISHERIDTYPE_STRING;
86
0
        return UA_String_copy((const UA_String *)data, &p->id.string);
87
0
    } else {
88
0
        return UA_STATUSCODE_BADINTERNALERROR;
89
0
    }
90
0
    return UA_STATUSCODE_GOOD;
91
0
}
92
93
void
94
0
UA_PublisherId_toVariant(const UA_PublisherId *p, UA_Variant *dst) {
95
0
    UA_PublisherId *p2 = (UA_PublisherId*)(uintptr_t)p;
96
0
    switch(p->idType) {
97
0
    case UA_PUBLISHERIDTYPE_BYTE:
98
0
        UA_Variant_setScalar(dst, &p2->id.byte, &UA_TYPES[UA_TYPES_BYTE]); break;
99
0
    case UA_PUBLISHERIDTYPE_UINT16:
100
0
        UA_Variant_setScalar(dst, &p2->id.uint16, &UA_TYPES[UA_TYPES_UINT16]); break;
101
0
    case UA_PUBLISHERIDTYPE_UINT32:
102
0
        UA_Variant_setScalar(dst, &p2->id.uint32, &UA_TYPES[UA_TYPES_UINT32]); break;
103
0
    case UA_PUBLISHERIDTYPE_UINT64:
104
0
        UA_Variant_setScalar(dst, &p2->id.uint64, &UA_TYPES[UA_TYPES_UINT64]); break;
105
0
    case UA_PUBLISHERIDTYPE_STRING:
106
0
        UA_Variant_setScalar(dst, &p2->id.string, &UA_TYPES[UA_TYPES_STRING]); break;
107
0
    default: break; /* This is not possible if the PublisherId is well-defined */
108
0
    }
109
0
}
110
111
UA_ConnectionManager *
112
0
getCM(UA_EventLoop *el, UA_String protocol) {
113
0
    for(UA_EventSource *es = el->eventSources; es != NULL; es = es->next) {
114
0
        if(es->eventSourceType != UA_EVENTSOURCETYPE_CONNECTIONMANAGER)
115
0
            continue;
116
0
        UA_ConnectionManager *cm = (UA_ConnectionManager*)es;
117
0
        if(UA_String_equal(&protocol, &cm->protocol))
118
0
            return cm;
119
0
    }
120
0
    return NULL;
121
0
}
122
123
static enum ZIP_CMP
124
0
cmpReserveId(const void *a, const void *b) {
125
0
    const UA_ReserveId *aa = (const UA_ReserveId*)a;
126
0
    const UA_ReserveId *bb = (const UA_ReserveId*)b;
127
0
    if(aa->id != bb->id)
128
0
        return (aa->id < bb->id) ? ZIP_CMP_LESS : ZIP_CMP_MORE;
129
0
    if(aa->reserveIdType != bb->reserveIdType)
130
0
        return (aa->reserveIdType < bb->reserveIdType) ? ZIP_CMP_LESS : ZIP_CMP_MORE;
131
0
    return (enum ZIP_CMP)UA_order(&aa->transportProfileUri,
132
0
                                  &bb->transportProfileUri, &UA_TYPES[UA_TYPES_STRING]);
133
0
}
134
135
ZIP_FUNCTIONS(UA_ReserveIdTree, UA_ReserveId, treeEntry, UA_ReserveId, id, cmpReserveId)
136
137
static UA_ReserveId *
138
UA_ReserveId_new(UA_UInt16 id, UA_String transportProfileUri,
139
0
                 UA_ReserveIdType reserveIdType, UA_NodeId sessionId) {
140
0
    UA_ReserveId *reserveId = (UA_ReserveId *)UA_calloc(1, sizeof(UA_ReserveId));
141
0
    if(!reserveId)
142
0
        return NULL;
143
0
    reserveId->id = id;
144
0
    reserveId->reserveIdType = reserveIdType;
145
0
    UA_String_copy(&transportProfileUri, &reserveId->transportProfileUri);
146
0
    reserveId->sessionId = sessionId;
147
0
    return reserveId;
148
0
}
149
150
static UA_Boolean
151
UA_ReserveId_isFree(UA_PubSubManager *psm, UA_UInt16 id, UA_String transportProfileUri,
152
0
                    UA_ReserveIdType reserveIdType) {
153
    /* Is the id already in use? */
154
0
    UA_ReserveId compare;
155
0
    compare.id = id;
156
0
    compare.reserveIdType = reserveIdType;
157
0
    compare.transportProfileUri = transportProfileUri;
158
0
    if(ZIP_FIND(UA_ReserveIdTree, &psm->reserveIds, &compare))
159
0
        return false;
160
161
0
    UA_PubSubConnection *tmpConnection;
162
0
    TAILQ_FOREACH(tmpConnection, &psm->connections, listEntry) {
163
0
        UA_WriterGroup *writerGroup;
164
0
        LIST_FOREACH(writerGroup, &tmpConnection->writerGroups, listEntry) {
165
0
            if(reserveIdType == UA_WRITER_GROUP) {
166
0
                if(UA_String_equal(&tmpConnection->config.transportProfileUri,
167
0
                                   &transportProfileUri) &&
168
0
                   writerGroup->config.writerGroupId == id)
169
0
                    return false;
170
            /* reserveIdType == UA_DATA_SET_WRITER */
171
0
            } else {
172
0
                UA_DataSetWriter *currentWriter;
173
0
                LIST_FOREACH(currentWriter, &writerGroup->writers, listEntry) {
174
0
                    if(UA_String_equal(&tmpConnection->config.transportProfileUri,
175
0
                                       &transportProfileUri) &&
176
0
                       currentWriter->config.dataSetWriterId == id)
177
0
                        return false;
178
0
                }
179
0
            }
180
0
        }
181
0
    }
182
0
    return true;
183
0
}
184
185
static UA_UInt16
186
UA_ReserveId_createId(UA_PubSubManager *psm,  UA_NodeId sessionId,
187
0
                      UA_String transportProfileUri, UA_ReserveIdType reserveIdType) {
188
    /* Total number of possible Ids */
189
0
    UA_UInt16 numberOfIds = 0x8000;
190
    /* Contains next possible free Id */
191
0
    static UA_UInt16 next_id_writerGroup = UA_RESERVEID_FIRST_ID;
192
0
    static UA_UInt16 next_id_writer = UA_RESERVEID_FIRST_ID;
193
0
    UA_UInt16 next_id;
194
0
    UA_Boolean is_free = false;
195
196
0
    if(reserveIdType == UA_WRITER_GROUP)
197
0
        next_id = next_id_writerGroup;
198
0
    else
199
0
        next_id = next_id_writer;
200
201
0
    for(;numberOfIds > 0;numberOfIds--) {
202
0
        if(next_id < UA_RESERVEID_FIRST_ID)
203
0
            next_id = UA_RESERVEID_FIRST_ID;
204
0
        is_free = UA_ReserveId_isFree(psm, next_id, transportProfileUri, reserveIdType);
205
0
        if(is_free)
206
0
            break;
207
0
        next_id++;
208
0
    }
209
0
    if(!is_free) {
210
0
        UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB,
211
0
                     "PubSub ReserveId creation failed. No free ID could be found.");
212
0
        return 0;
213
0
    }
214
215
0
    if(reserveIdType == UA_WRITER_GROUP)
216
0
        next_id_writerGroup = (UA_UInt16)(next_id + 1);
217
0
    else
218
0
        next_id_writer = (UA_UInt16)(next_id + 1);
219
220
0
    UA_ReserveId *reserveId =
221
0
        UA_ReserveId_new(next_id, transportProfileUri, reserveIdType, sessionId);
222
0
    if(!reserveId)
223
0
        return 0;
224
225
0
    ZIP_INSERT(UA_ReserveIdTree, &psm->reserveIds, reserveId);
226
0
    psm->reserveIdsSize++;
227
0
    return next_id;
228
0
}
229
230
static void *
231
0
removeReserveId(void *context, UA_ReserveId *elem) {
232
0
    UA_String_clear(&elem->transportProfileUri);
233
0
    UA_free(elem);
234
0
    return NULL;
235
0
}
236
237
struct RemoveInactiveReserveIdContext {
238
    UA_PubSubManager *psm;
239
    UA_ReserveIdTree newTree;
240
};
241
242
/* Remove ReserveIds that are not attached to any session */
243
static void *
244
0
removeInactiveReserveId(void *context, UA_ReserveId *elem) {
245
0
    struct RemoveInactiveReserveIdContext *ctx =
246
0
        (struct RemoveInactiveReserveIdContext*)context;
247
248
0
    if(UA_NodeId_equal(&ctx->psm->sc.server->adminSession.sessionId, &elem->sessionId))
249
0
        goto still_active;
250
251
0
    session_list_entry *session;
252
0
    LIST_FOREACH(session, &ctx->psm->sc.server->sessions, pointers) {
253
0
        if(UA_NodeId_equal(&session->session.sessionId, &elem->sessionId))
254
0
            goto still_active;
255
0
    }
256
257
0
    ctx->psm->reserveIdsSize--;
258
0
    UA_String_clear(&elem->transportProfileUri);
259
0
    UA_free(elem);
260
0
    return NULL;
261
262
0
 still_active:
263
0
    ZIP_INSERT(UA_ReserveIdTree, &ctx->newTree, elem);
264
0
    return NULL;
265
0
}
266
267
void
268
0
UA_PubSubManager_freeIds(UA_PubSubManager *psm) {
269
0
    struct RemoveInactiveReserveIdContext removeCtx;
270
0
    removeCtx.psm = psm;
271
0
    removeCtx.newTree.root = NULL;
272
0
    ZIP_ITER(UA_ReserveIdTree, &psm->reserveIds,
273
0
             removeInactiveReserveId, &removeCtx);
274
0
    psm->reserveIds = removeCtx.newTree;
275
0
}
276
277
UA_StatusCode
278
UA_PubSubManager_reserveIds(UA_PubSubManager *psm, UA_NodeId sessionId, UA_UInt16 numRegWriterGroupIds,
279
                            UA_UInt16 numRegDataSetWriterIds, UA_String transportProfileUri,
280
0
                            UA_UInt16 **writerGroupIds, UA_UInt16 **dataSetWriterIds) {
281
0
    UA_PubSubManager_freeIds(psm);
282
283
    /* Check the validation of the transportProfileUri */
284
0
    UA_String profile_1 = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp");
285
0
    UA_String profile_2 = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json");
286
0
    UA_String profile_3 = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
287
0
    if(!UA_String_equal(&transportProfileUri, &profile_1) &&
288
0
       !UA_String_equal(&transportProfileUri, &profile_2) &&
289
0
       !UA_String_equal(&transportProfileUri, &profile_3)) {
290
0
        UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB,
291
0
                     "PubSub ReserveId creation failed. No valid transport profile uri.");
292
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
293
0
    }
294
0
    *writerGroupIds = (UA_UInt16*)UA_Array_new(numRegWriterGroupIds, &UA_TYPES[UA_TYPES_UINT16]);
295
0
    *dataSetWriterIds = (UA_UInt16*)UA_Array_new(numRegDataSetWriterIds, &UA_TYPES[UA_TYPES_UINT16]);
296
297
0
    for(int i = 0; i < numRegWriterGroupIds; i++) {
298
0
        (*writerGroupIds)[i] =
299
0
            UA_ReserveId_createId(psm, sessionId, transportProfileUri, UA_WRITER_GROUP);
300
0
    }
301
0
    for(int i = 0; i < numRegDataSetWriterIds; i++) {
302
0
        (*dataSetWriterIds)[i] =
303
0
            UA_ReserveId_createId(psm, sessionId, transportProfileUri, UA_DATA_SET_WRITER);
304
0
    }
305
0
    return UA_STATUSCODE_GOOD;
306
0
}
307
308
/* Calculate the time difference between current time and UTC (00:00) on January
309
 * 1, 2000. */
310
UA_UInt32
311
0
UA_PubSubConfigurationVersionTimeDifference(UA_DateTime now) {
312
0
    UA_UInt32 timeDiffSince2000 = (UA_UInt32)(now - UA_DATETIMESTAMP_2000);
313
0
    return timeDiffSince2000;
314
0
}
315
316
/* Generate a new unique NodeId. This NodeId will be used for the information
317
 * model representation of PubSub entities. */
318
#ifndef UA_ENABLE_PUBSUB_INFORMATIONMODEL
319
void
320
UA_PubSubManager_generateUniqueNodeId(UA_PubSubManager *psm, UA_NodeId *nodeId) {
321
    *nodeId = UA_NODEID_NUMERIC(1, ++psm->uniqueIdCount);
322
}
323
#endif
324
325
UA_Guid
326
0
UA_PubSubManager_generateUniqueGuid(UA_PubSubManager *psm) {
327
0
    while(true) {
328
0
        UA_NodeId testId = UA_NODEID_GUID(1, UA_Guid_random());
329
0
        const UA_Node *testNode = UA_NODESTORE_GET(psm->sc.server, &testId);
330
0
        if(!testNode)
331
0
            return testId.identifier.guid;
332
0
        UA_NODESTORE_RELEASE(psm->sc.server, testNode);
333
0
    }
334
0
}
335
336
static UA_UInt64
337
551
generateRandomUInt64(void) {
338
551
    UA_UInt64 id = 0;
339
551
    UA_Guid ident = UA_Guid_random();
340
341
551
    id = id + ident.data1;
342
551
    id = (id << 32) + ident.data2;
343
551
    id = (id << 16) + ident.data3;
344
551
    return id;
345
551
}
346
347
UA_StatusCode
348
0
UA_Server_enableAllPubSubComponents(UA_Server *server) {
349
0
    lockServer(server);
350
0
    UA_PubSubManager *psm = getPSM(server);
351
0
    if(!psm) {
352
0
        unlockServer(server);
353
0
        return UA_STATUSCODE_BADINTERNALERROR;
354
0
    }
355
356
0
    UA_StatusCode res = UA_PubSubManager_start(&psm->sc, server);
357
0
    if(res != UA_STATUSCODE_GOOD)
358
0
        return res;
359
360
0
    UA_PubSubConnection *c;
361
0
    TAILQ_FOREACH(c, &psm->connections, listEntry) {
362
0
        UA_WriterGroup *wg;
363
0
        LIST_FOREACH(wg, &c->writerGroups, listEntry) {
364
0
            UA_DataSetWriter *dsw;
365
0
            LIST_FOREACH(dsw, &wg->writers, listEntry) {
366
0
                res |= UA_DataSetWriter_setPubSubState(psm, dsw, UA_PUBSUBSTATE_OPERATIONAL);
367
0
            }
368
0
            res |= UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_OPERATIONAL);
369
0
        }
370
371
0
        UA_ReaderGroup *rg;
372
0
        LIST_FOREACH(rg, &c->readerGroups, listEntry) {
373
0
            UA_DataSetReader *dsr;
374
0
            LIST_FOREACH(dsr, &rg->readers, listEntry) {
375
0
                UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_OPERATIONAL,
376
0
                                                UA_STATUSCODE_GOOD);
377
0
            }
378
0
            res |= UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_OPERATIONAL);
379
0
        }
380
381
0
        res |= UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_OPERATIONAL);
382
0
    }
383
384
0
    unlockServer(server);
385
0
    return res;
386
0
}
387
388
static void
389
300
disableAllPubSubComponents(UA_PubSubManager *psm) {
390
300
    UA_PubSubConnection *c;
391
300
    TAILQ_FOREACH(c, &psm->connections, listEntry) {
392
0
        UA_WriterGroup *wg;
393
0
        LIST_FOREACH(wg, &c->writerGroups, listEntry) {
394
0
            UA_DataSetWriter *dsw;
395
0
            LIST_FOREACH(dsw, &wg->writers, listEntry) {
396
0
                UA_DataSetWriter_setPubSubState(psm, dsw, UA_PUBSUBSTATE_DISABLED);
397
0
            }
398
0
            UA_WriterGroup_setPubSubState(psm, wg, UA_PUBSUBSTATE_DISABLED);
399
0
        }
400
401
0
        UA_ReaderGroup *rg;
402
0
        LIST_FOREACH(rg, &c->readerGroups, listEntry) {
403
0
            UA_DataSetReader *dsr;
404
0
            LIST_FOREACH(dsr, &rg->readers, listEntry) {
405
0
                UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_DISABLED,
406
0
                                                UA_STATUSCODE_BADSHUTDOWN);
407
0
            }
408
0
            UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_DISABLED);
409
0
        }
410
411
0
        UA_PubSubConnection_setPubSubState(psm, c, UA_PUBSUBSTATE_DISABLED);
412
0
    }
413
300
}
414
415
void
416
0
UA_Server_disableAllPubSubComponents(UA_Server *server) {
417
0
    lockServer(server);
418
0
    UA_PubSubManager *psm = getPSM(server);
419
0
    if(psm)
420
0
        UA_PubSubManager_stop(&psm->sc); /* Calls disableAll internally */
421
0
    unlockServer(server);
422
0
}
423
424
static UA_StatusCode
425
getPubSubComponentType(UA_PubSubManager *psm, UA_NodeId componentId,
426
0
                       UA_PubSubComponentType *outType) {
427
0
    UA_PubSubConnection *c;
428
0
    TAILQ_FOREACH(c, &psm->connections, listEntry) {
429
0
        if(UA_NodeId_equal(&componentId, &c->head.identifier)) {
430
0
            *outType = c->head.componentType;
431
0
            return UA_STATUSCODE_GOOD;
432
0
        }
433
434
0
        UA_WriterGroup *wg;
435
0
        LIST_FOREACH(wg, &c->writerGroups, listEntry) {
436
0
            if(UA_NodeId_equal(&componentId, &wg->head.identifier)) {
437
0
                *outType = wg->head.componentType;
438
0
                return UA_STATUSCODE_GOOD;
439
0
            }
440
441
0
            UA_DataSetWriter *dsw;
442
0
            LIST_FOREACH(dsw, &wg->writers, listEntry) {
443
0
                if(UA_NodeId_equal(&componentId, &dsw->head.identifier)) {
444
0
                    *outType = dsw->head.componentType;
445
0
                    return UA_STATUSCODE_GOOD;
446
0
                }
447
0
            }
448
0
        }
449
450
0
        UA_ReaderGroup *rg;
451
0
        LIST_FOREACH(rg, &c->readerGroups, listEntry) {
452
0
            if(UA_NodeId_equal(&componentId, &rg->head.identifier)) {
453
0
                *outType = rg->head.componentType;
454
0
                return UA_STATUSCODE_GOOD;
455
0
            }
456
457
0
            UA_DataSetReader *dsr;
458
0
            LIST_FOREACH(dsr, &rg->readers, listEntry) {
459
0
                if(UA_NodeId_equal(&componentId, &dsr->head.identifier)) {
460
0
                    *outType = dsr->head.componentType;
461
0
                    return UA_STATUSCODE_GOOD;
462
0
                }
463
0
            }
464
0
        }
465
0
    }
466
467
0
    return UA_STATUSCODE_BADNOTFOUND;
468
0
}
469
470
UA_StatusCode
471
UA_Server_getPubSubComponentType(UA_Server *server, UA_NodeId componentId,
472
0
                                 UA_PubSubComponentType *outType) {
473
0
    if(!outType)
474
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
475
0
    lockServer(server);
476
0
    UA_PubSubManager *psm = getPSM(server);
477
0
    UA_StatusCode res = (psm) ?
478
0
        getPubSubComponentType(psm, componentId, outType) : UA_STATUSCODE_BADINTERNALERROR;
479
0
    unlockServer(server);
480
0
    return res;
481
0
}
482
483
static UA_StatusCode
484
getPubSubComponentParent(UA_PubSubManager *psm, UA_NodeId componentId,
485
0
                         UA_NodeId *outParent) {
486
0
    UA_PubSubConnection *c;
487
0
    TAILQ_FOREACH(c, &psm->connections, listEntry) {
488
0
        if(UA_NodeId_equal(&componentId, &c->head.identifier))
489
0
            return UA_STATUSCODE_BADNOTSUPPORTED;
490
491
0
        UA_WriterGroup *wg;
492
0
        LIST_FOREACH(wg, &c->writerGroups, listEntry) {
493
0
            if(UA_NodeId_equal(&componentId, &wg->head.identifier))
494
0
                return UA_NodeId_copy(&c->head.identifier, outParent);
495
496
0
            UA_DataSetWriter *dsw;
497
0
            LIST_FOREACH(dsw, &wg->writers, listEntry) {
498
0
                if(UA_NodeId_equal(&componentId, &dsw->head.identifier))
499
0
                    return UA_NodeId_copy(&wg->head.identifier, outParent);
500
0
            }
501
0
        }
502
503
0
        UA_ReaderGroup *rg;
504
0
        LIST_FOREACH(rg, &c->readerGroups, listEntry) {
505
0
            if(UA_NodeId_equal(&componentId, &rg->head.identifier))
506
0
                return UA_NodeId_copy(&c->head.identifier, outParent);
507
508
0
            UA_DataSetReader *dsr;
509
0
            LIST_FOREACH(dsr, &rg->readers, listEntry) {
510
0
                if(UA_NodeId_equal(&componentId, &dsr->head.identifier))
511
0
                    return UA_NodeId_copy(&rg->head.identifier, outParent);
512
0
            }
513
0
        }
514
0
    }
515
516
0
    return UA_STATUSCODE_BADNOTFOUND;
517
0
}
518
519
UA_StatusCode
520
UA_Server_getPubSubComponentParent(UA_Server *server, UA_NodeId componentId,
521
0
                                   UA_NodeId *outParent) {
522
0
    if(!outParent)
523
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
524
0
    lockServer(server);
525
0
    UA_PubSubManager *psm = getPSM(server);
526
0
    UA_StatusCode res = (psm) ?
527
0
        getPubSubComponentParent(psm, componentId, outParent) : UA_STATUSCODE_BADINTERNALERROR;
528
0
    unlockServer(server);
529
0
    return res;
530
0
}
531
532
static UA_StatusCode
533
getPubSubComponentChildren(UA_PubSubManager *psm, UA_NodeId componentId,
534
0
                           size_t *outChildrenSize, UA_NodeId **outChildren) {
535
0
    UA_WriterGroup *wg;
536
0
    UA_ReaderGroup *rg;
537
0
    UA_DataSetWriter *dsw;
538
0
    UA_DataSetReader *dsr;
539
0
    UA_PubSubConnection *c;
540
541
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
542
0
    TAILQ_FOREACH(c, &psm->connections, listEntry) {
543
0
        if(UA_NodeId_equal(&componentId, &c->head.identifier)) {
544
            /* Count the children */
545
0
            size_t children = 0;
546
0
            LIST_FOREACH(wg, &c->writerGroups, listEntry)
547
0
                children++;
548
0
            LIST_FOREACH(rg, &c->readerGroups, listEntry)
549
0
                children++;
550
551
            /* Empty array? */
552
0
            if(children == 0) {
553
0
                *outChildren = NULL;
554
0
                *outChildrenSize = 0;
555
0
                return UA_STATUSCODE_GOOD;
556
0
            }
557
558
            /* Allocate the array */
559
0
            *outChildren = (UA_NodeId*)UA_calloc(children, sizeof(UA_NodeId));
560
0
            if(!*outChildren)
561
0
                return UA_STATUSCODE_BADOUTOFMEMORY;
562
0
            *outChildrenSize = children;
563
564
            /* Copy the NodeIds */
565
0
            size_t pos = 0;
566
0
            LIST_FOREACH(wg, &c->writerGroups, listEntry) {
567
0
                res |= UA_NodeId_copy(&wg->head.identifier, (*outChildren) + pos);
568
0
                pos++;
569
0
            }
570
0
            LIST_FOREACH(rg, &c->readerGroups, listEntry) {
571
0
                res |= UA_NodeId_copy(&rg->head.identifier, (*outChildren) + pos);
572
0
                pos++;
573
0
            }
574
0
            goto out;
575
0
        }
576
577
0
        LIST_FOREACH(wg, &c->writerGroups, listEntry) {
578
0
            if(UA_NodeId_equal(&componentId, &wg->head.identifier)) {
579
                /* Count the children */
580
0
                size_t children = 0;
581
0
                LIST_FOREACH(dsw, &wg->writers, listEntry)
582
0
                    children++;
583
584
                /* Empty array? */
585
0
                if(children == 0) {
586
0
                    *outChildren = NULL;
587
0
                    *outChildrenSize = 0;
588
0
                    return UA_STATUSCODE_GOOD;
589
0
                }
590
591
                /* Allocate the array */
592
0
                *outChildren = (UA_NodeId*)UA_calloc(children, sizeof(UA_NodeId));
593
0
                if(!*outChildren)
594
0
                    return UA_STATUSCODE_BADOUTOFMEMORY;
595
0
                *outChildrenSize = children;
596
597
                /* Copy the NodeIds */
598
0
                size_t pos = 0;
599
0
                LIST_FOREACH(dsw, &wg->writers, listEntry) {
600
0
                    res |= UA_NodeId_copy(&dsw->head.identifier, (*outChildren) + pos);
601
0
                    pos++;
602
0
                }
603
0
                goto out;
604
0
            }
605
606
            /* DataSetWriter have no children (with a state machine) */
607
0
            LIST_FOREACH(dsw, &wg->writers, listEntry) {
608
0
                if(UA_NodeId_equal(&componentId, &dsw->head.identifier))
609
0
                    return UA_STATUSCODE_BADNOTSUPPORTED;
610
0
            }
611
0
        }
612
613
0
        LIST_FOREACH(rg, &c->readerGroups, listEntry) {
614
0
            if(UA_NodeId_equal(&componentId, &rg->head.identifier)) {
615
                /* Count the children */
616
0
                size_t children = 0;
617
0
                LIST_FOREACH(dsr, &rg->readers, listEntry)
618
0
                    children++;
619
620
                /* Empty array? */
621
0
                if(children == 0) {
622
0
                    *outChildren = NULL;
623
0
                    *outChildrenSize = 0;
624
0
                    return UA_STATUSCODE_GOOD;
625
0
                }
626
627
                /* Allocate the array */
628
0
                *outChildren = (UA_NodeId*)UA_calloc(children, sizeof(UA_NodeId));
629
0
                if(!*outChildren)
630
0
                    return UA_STATUSCODE_BADOUTOFMEMORY;
631
0
                *outChildrenSize = children;
632
633
                /* Copy the NodeIds */
634
0
                size_t pos = 0;
635
0
                LIST_FOREACH(dsr, &rg->readers, listEntry) {
636
0
                    res |= UA_NodeId_copy(&dsr->head.identifier, (*outChildren) + pos);
637
0
                    pos++;
638
0
                }
639
0
                goto out;
640
0
            }
641
642
            /* DataSetReader have no children (with a state machine) */
643
0
            LIST_FOREACH(dsr, &rg->readers, listEntry) {
644
0
                if(UA_NodeId_equal(&componentId, &dsr->head.identifier))
645
0
                    return UA_STATUSCODE_BADNOTSUPPORTED;
646
0
            }
647
0
        }
648
0
    }
649
650
0
    return UA_STATUSCODE_BADNOTFOUND;
651
652
0
 out:
653
0
    if(res != UA_STATUSCODE_GOOD)
654
0
        UA_Array_delete(*outChildren, *outChildrenSize, &UA_TYPES[UA_TYPES_NODEID]);
655
0
    return res;
656
0
}
657
658
UA_StatusCode
659
UA_Server_getPubSubComponentChildren(UA_Server *server, UA_NodeId componentId,
660
0
                                     size_t *outChildrenSize, UA_NodeId **outChildren) {
661
0
    if(!outChildrenSize || !outChildren)
662
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
663
0
    lockServer(server);
664
0
    UA_PubSubManager *psm = getPSM(server);
665
0
    UA_StatusCode res = (psm) ?
666
0
        getPubSubComponentChildren(psm, componentId,
667
0
                                   outChildrenSize, outChildren) : UA_STATUSCODE_BADINTERNALERROR;
668
0
    unlockServer(server);
669
0
    return res;
670
0
}
671
672
void
673
600
UA_PubSubManager_setState(UA_PubSubManager *psm, UA_LifecycleState state) {
674
600
    if(state == UA_LIFECYCLESTATE_STOPPED)
675
300
        state = UA_LIFECYCLESTATE_STOPPING;
676
677
    /* Check if all connections are closed if we are started */
678
600
    if(state == UA_LIFECYCLESTATE_STOPPING) {
679
300
        UA_PubSubConnection *c;
680
300
        TAILQ_FOREACH(c, &psm->connections, listEntry) {
681
0
            if(c->sendChannel != 0 || c->recvChannelsSize > 0)
682
0
                goto set_state;
683
684
0
            UA_WriterGroup *wg;
685
0
            LIST_FOREACH(wg, &c->writerGroups, listEntry) {
686
0
                if(wg->sendChannel > 0)
687
0
                    goto set_state;
688
0
            }
689
690
0
            UA_ReaderGroup *rg;
691
0
            LIST_FOREACH(rg, &c->readerGroups, listEntry) {
692
0
                if(rg->recvChannelsSize > 0)
693
0
                    goto set_state;
694
0
            }
695
0
        }
696
697
        /* No open connections -> stopped */
698
300
        state = UA_LIFECYCLESTATE_STOPPED;
699
300
    }
700
701
600
 set_state:
702
600
    if(state == psm->sc.state)
703
0
        return;
704
600
    psm->sc.state = state;
705
600
    if(psm->sc.notifyState)
706
0
        psm->sc.notifyState(&psm->sc, state);
707
708
    /* When we just started, trigger all connections to go from PAUSED to
709
     * OPERATIONAL */
710
600
    if(state == UA_LIFECYCLESTATE_STARTED) {
711
300
        UA_PubSubConnection *c;
712
300
        TAILQ_FOREACH(c, &psm->connections, listEntry) {
713
0
            UA_PubSubConnection_setPubSubState(psm, c, c->head.state);
714
0
        }
715
300
    }
716
600
}
717
718
static UA_StatusCode
719
300
UA_PubSubManager_start(UA_ServerComponent *sc, UA_Server *server) {
720
300
    UA_PubSubManager *psm = (UA_PubSubManager*)sc;
721
300
    if(psm->sc.state == UA_LIFECYCLESTATE_STOPPING) {
722
0
        UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB,
723
0
                     "The PubSubManager is still stopping");
724
0
        return UA_STATUSCODE_BADINTERNALERROR;
725
0
    }
726
727
    /* Re-cache for the case that the configuration has been updated */
728
300
    psm->logging = server->config.logging;
729
730
300
    UA_PubSubManager_setState(psm, UA_LIFECYCLESTATE_STARTED);
731
732
300
    return UA_STATUSCODE_GOOD;
733
300
}
734
735
static void
736
300
UA_PubSubManager_stop(UA_ServerComponent *sc) {
737
300
    UA_PubSubManager *psm = (UA_PubSubManager*)sc;
738
300
    disableAllPubSubComponents(psm);
739
300
    UA_PubSubManager_setState(psm, UA_LIFECYCLESTATE_STOPPED);
740
300
}
741
742
UA_StatusCode
743
551
UA_PubSubManager_clear(UA_PubSubManager *psm) {
744
551
    if(psm->sc.state != UA_LIFECYCLESTATE_STOPPED) {
745
0
        UA_LOG_ERROR(psm->logging, UA_LOGCATEGORY_PUBSUB,
746
0
                     "Cannot delete the PubSubManager because "
747
0
                     "it is not stopped");
748
0
        return UA_STATUSCODE_BADINTERNALERROR;
749
0
    }
750
751
551
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
752
753
    /* Remove Connections - this also remove WriterGroups and ReaderGroups */
754
551
    UA_PubSubConnection *c, *tmpC;
755
551
    TAILQ_FOREACH_SAFE(c, &psm->connections, listEntry, tmpC) {
756
0
        UA_PubSubConnection_delete(psm, c);
757
0
    }
758
759
    /* Remove the DataSets */
760
551
    UA_PublishedDataSet *tmpPDS1, *tmpPDS2;
761
551
    TAILQ_FOREACH_SAFE(tmpPDS1, &psm->publishedDataSets, listEntry, tmpPDS2) {
762
0
        UA_PublishedDataSet_remove(psm, tmpPDS1);
763
0
    }
764
765
    /* Remove the ReserveIds*/
766
551
    ZIP_ITER(UA_ReserveIdTree, &psm->reserveIds, removeReserveId, NULL);
767
551
    psm->reserveIdsSize = 0;
768
769
    /* Delete subscribed datasets */
770
551
    UA_SubscribedDataSet *tmpSDS1, *tmpSDS2;
771
551
    TAILQ_FOREACH_SAFE(tmpSDS1, &psm->subscribedDataSets, listEntry, tmpSDS2) {
772
0
        UA_SubscribedDataSet_remove(psm, tmpSDS1);
773
0
    }
774
775
#ifdef UA_ENABLE_PUBSUB_SKS
776
    /* Remove the SecurityGroups */
777
    UA_SecurityGroup *tmpSG1, *tmpSG2;
778
    TAILQ_FOREACH_SAFE(tmpSG1, &psm->securityGroups, listEntry, tmpSG2) {
779
        UA_SecurityGroup_remove(psm, tmpSG1);
780
    }
781
782
    /* Remove the keyStorages */
783
    UA_PubSubKeyStorage *ks, *ksTmp;
784
    LIST_FOREACH_SAFE(ks, &psm->pubSubKeyList, keyStorageList, ksTmp) {
785
        UA_PubSubKeyStorage_delete(psm, ks);
786
    }
787
#endif
788
789
551
    return UA_STATUSCODE_GOOD;
790
551
}
791
792
UA_ServerComponent *
793
551
UA_PubSubManager_new(UA_Server *server) {
794
551
    UA_PubSubManager *psm = (UA_PubSubManager*)UA_calloc(1, sizeof(UA_PubSubManager));
795
551
    if(!psm)
796
0
        return NULL;
797
798
551
    psm->sc.server = server;
799
551
    psm->sc.name = UA_STRING("pubsub");
800
551
    psm->sc.start = UA_PubSubManager_start;
801
551
    psm->sc.stop = UA_PubSubManager_stop;
802
551
    psm->sc.clear = (UA_StatusCode (*)(UA_ServerComponent *))UA_PubSubManager_clear;
803
804
    /* Set the logging shortcut */
805
551
    psm->logging = server->config.logging;
806
807
    /* TODO: Using the Mac address to generate the defaultPublisherId.
808
     * In the future, this can be retrieved from the Eventloop. */
809
551
    psm->defaultPublisherId = generateRandomUInt64();
810
811
551
    TAILQ_INIT(&psm->connections);
812
551
    TAILQ_INIT(&psm->publishedDataSets);
813
551
    TAILQ_INIT(&psm->subscribedDataSets);
814
815
#ifdef UA_ENABLE_PUBSUB_SKS
816
    TAILQ_INIT(&psm->securityGroups);
817
#endif
818
819
551
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
820
    /* Build PubSub information model */
821
551
    initPubSubNS0(server);
822
551
#endif
823
824
551
    return &psm->sc;
825
551
}
826
827
#endif /* UA_ENABLE_PUBSUB */