Coverage Report

Created: 2026-05-30 06:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541_15/src/pubsub/ua_pubsub_readergroup.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 * Copyright (c) 2017-2025 Fraunhofer IOSB (Author: Andreas Ebner)
6
 * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
7
 * Copyright (c) 2019 Kalycito Infotech Private Limited
8
 * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes)
9
 * Copyright (c) 2022 Linutronix GmbH (Author: Muddasir Shakil)
10
 */
11
12
#include <open62541/server_pubsub.h>
13
#include "ua_pubsub_internal.h"
14
15
#ifdef UA_ENABLE_PUBSUB /* conditional compilation */
16
17
#ifdef UA_ENABLE_PUBSUB_SKS
18
#include "ua_pubsub_keystorage.h"
19
#endif
20
21
UA_ReaderGroup *
22
0
UA_ReaderGroup_find(UA_PubSubManager *psm, const UA_NodeId id) {
23
0
    if(!psm)
24
0
        return NULL;
25
0
    UA_PubSubConnection *psc;
26
0
    TAILQ_FOREACH(psc, &psm->connections, listEntry) {
27
0
        UA_ReaderGroup *rg;
28
0
        LIST_FOREACH(rg, &psc->readerGroups, listEntry) {
29
0
            if(UA_NodeId_equal(&id, &rg->head.identifier))
30
0
                return rg;
31
0
        }
32
0
    }
33
0
    return NULL;
34
0
}
35
36
/* ReaderGroup Config Handling */
37
38
UA_StatusCode
39
UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
40
0
                          UA_ReaderGroupConfig *dst) {
41
0
    memcpy(dst, src, sizeof(UA_ReaderGroupConfig));
42
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
43
0
    res |= UA_String_copy(&src->name, &dst->name);
44
0
    res |= UA_KeyValueMap_copy(&src->groupProperties, &dst->groupProperties);
45
0
    res |= UA_String_copy(&src->securityGroupId, &dst->securityGroupId);
46
0
    res |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
47
0
    if(res != UA_STATUSCODE_GOOD)
48
0
        UA_ReaderGroupConfig_clear(dst);
49
0
    return res;
50
0
}
51
52
void
53
0
UA_ReaderGroupConfig_clear(UA_ReaderGroupConfig *readerGroupConfig) {
54
0
    UA_String_clear(&readerGroupConfig->name);
55
0
    UA_KeyValueMap_clear(&readerGroupConfig->groupProperties);
56
0
    UA_String_clear(&readerGroupConfig->securityGroupId);
57
0
    UA_ExtensionObject_clear(&readerGroupConfig->transportSettings);
58
0
}
59
60
61
#ifdef UA_ENABLE_PUBSUB_SKS
62
static UA_StatusCode
63
readerGroupAttachSKSKeystorage(UA_PubSubManager *psm, UA_ReaderGroup *rg) {
64
    /* No SecurityGroup defined */
65
    if(UA_String_isEmpty(&rg->config.securityGroupId) || !rg->config.securityPolicy)
66
        return UA_STATUSCODE_GOOD;
67
68
    /* KeyStorage already connected */
69
    if(rg->keyStorage)
70
        return UA_STATUSCODE_GOOD;
71
72
    /* Does the key storage already exist? */
73
    rg->keyStorage = UA_PubSubKeyStorage_find(psm, rg->config.securityGroupId);
74
    if(rg->keyStorage) {
75
        rg->keyStorage->referenceCount++; /* Increase the ref count */
76
        return UA_STATUSCODE_GOOD;
77
    }
78
79
    /* Create a new key storage */
80
    rg->keyStorage = (UA_PubSubKeyStorage *)UA_calloc(1, sizeof(UA_PubSubKeyStorage));
81
    if(!rg->keyStorage)
82
        return UA_STATUSCODE_BADOUTOFMEMORY;
83
84
    /* Initialize the KeyStorage */
85
    UA_StatusCode res =
86
        UA_PubSubKeyStorage_init(psm, rg->keyStorage, &rg->config.securityGroupId,
87
                                 rg->config.securityPolicy, 0, 0);
88
    if(res != UA_STATUSCODE_GOOD) {
89
        UA_PubSubKeyStorage_delete(psm, rg->keyStorage);
90
        rg->keyStorage = NULL;
91
        return res;
92
    }
93
94
    /* Increase the ref count */
95
    rg->keyStorage->referenceCount++;
96
    return UA_STATUSCODE_GOOD;
97
}
98
#endif
99
100
UA_StatusCode
101
UA_ReaderGroup_create(UA_PubSubManager *psm, UA_NodeId connectionId,
102
                      const UA_ReaderGroupConfig *rgc,
103
0
                      UA_NodeId *readerGroupId) {
104
    /* Check for valid readergroup configuration */
105
0
    if(!psm || !rgc)
106
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
107
108
    /* Search the connection by the given connectionIdentifier */
109
0
    UA_PubSubConnection *c = UA_PubSubConnection_find(psm, connectionId);
110
0
    if(!c)
111
0
        return UA_STATUSCODE_BADNOTFOUND;
112
113
    /* Allocate memory for new reader group and add settings */
114
0
    UA_ReaderGroup *newGroup = (UA_ReaderGroup *)UA_calloc(1, sizeof(UA_ReaderGroup));
115
0
    if(!newGroup)
116
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
117
118
0
    newGroup->head.componentType = UA_PUBSUBCOMPONENT_READERGROUP;
119
0
    newGroup->linkedConnection = c;
120
121
    /* Deep copy of the config */
122
0
    UA_StatusCode retval = UA_ReaderGroupConfig_copy(rgc, &newGroup->config);
123
0
    if(retval != UA_STATUSCODE_GOOD) {
124
0
        UA_free(newGroup);
125
0
        return retval;
126
0
    }
127
128
    /* Add to the connection */
129
0
    LIST_INSERT_HEAD(&c->readerGroups, newGroup, listEntry);
130
0
    c->readerGroupsSize++;
131
132
    /* Cache the log string */
133
0
    char tmpLogIdStr[128];
134
0
    mp_snprintf(tmpLogIdStr, 128, "%SReaderGroup %N\t| ",
135
0
                c->head.logIdString, newGroup->head.identifier);
136
0
    newGroup->head.logIdString = UA_STRING_ALLOC(tmpLogIdStr);
137
138
    /* Validate the connection settings */
139
0
    retval = UA_ReaderGroup_connect(psm, newGroup, true);
140
0
    if(retval != UA_STATUSCODE_GOOD) {
141
0
        UA_LOG_ERROR_PUBSUB(psm->logging, newGroup,
142
0
                            "Could not validate the connection parameters");
143
0
        UA_ReaderGroup_remove(psm, newGroup);
144
0
        return retval;
145
0
    }
146
147
    /* Attach SKS Keystorage */
148
#ifdef UA_ENABLE_PUBSUB_SKS
149
    if(rgc->securityMode == UA_MESSAGESECURITYMODE_SIGN ||
150
       rgc->securityMode == UA_MESSAGESECURITYMODE_SIGNANDENCRYPT) {
151
        retval = readerGroupAttachSKSKeystorage(psm, newGroup);
152
        if(retval != UA_STATUSCODE_GOOD) {
153
            UA_LOG_ERROR_PUBSUB(psm->logging, newGroup,
154
                                "Attaching the SKS KeyStorage failed");
155
            UA_ReaderGroup_remove(psm, newGroup);
156
            return retval;
157
        }
158
    }
159
#endif
160
161
    /* Create information model representation */
162
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
163
0
    retval |= addReaderGroupRepresentation(psm->sc.server, newGroup);
164
0
    if(retval != UA_STATUSCODE_GOOD) {
165
0
        UA_ReaderGroup_remove(psm, newGroup);
166
0
        return retval;
167
0
    }
168
#else
169
    UA_PubSubManager_generateUniqueNodeId(psm, &newGroup->head.identifier);
170
#endif
171
172
    /* Notify the application that a new ReaderGroup was created.
173
     * This may internally adjust the config */
174
0
    UA_Server *server = psm->sc.server;
175
0
    if(server->config.pubSubConfig.componentLifecycleCallback) {
176
0
        retval = server->config.pubSubConfig.
177
0
            componentLifecycleCallback(server, newGroup->head.identifier,
178
0
                                       UA_PUBSUBCOMPONENT_READERGROUP, false);
179
0
        if(retval != UA_STATUSCODE_GOOD) {
180
0
            UA_ReaderGroup_remove(psm, newGroup);
181
0
            return retval;
182
0
        }
183
0
    }
184
185
0
    UA_LOG_INFO_PUBSUB(psm->logging, newGroup, "ReaderGroup created (State: %s)",
186
0
                   UA_PubSubState_name(newGroup->head.state));
187
188
    /* Trigger the connection state machine. It might open a socket only when
189
     * the first ReaderGroup is attached. */
190
0
    if(rgc->enabled)
191
0
        UA_PubSubConnection_setPubSubState(psm, c, c->head.state);
192
193
    /* Copying a numeric NodeId always succeeds */
194
0
    if(readerGroupId)
195
0
        UA_NodeId_copy(&newGroup->head.identifier, readerGroupId);
196
197
    /* Enable the ReaderGroup immediately if the enabled flag is set */
198
0
    if(rgc->enabled)
199
0
        UA_ReaderGroup_setPubSubState(psm, newGroup, UA_PUBSUBSTATE_OPERATIONAL);
200
201
0
    return UA_STATUSCODE_GOOD;
202
0
}
203
204
UA_StatusCode
205
0
UA_ReaderGroup_remove(UA_PubSubManager *psm, UA_ReaderGroup *rg) {
206
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
207
208
0
    UA_PubSubConnection *connection = rg->linkedConnection;
209
0
    UA_assert(connection);
210
211
    /* Check with the application if we can remove */
212
0
    UA_Server *server = psm->sc.server;
213
0
    if(server->config.pubSubConfig.componentLifecycleCallback) {
214
0
        UA_StatusCode res = server->config.pubSubConfig.
215
0
            componentLifecycleCallback(server, rg->head.identifier,
216
0
                                       UA_PUBSUBCOMPONENT_READERGROUP, true);
217
0
        if(res != UA_STATUSCODE_GOOD)
218
0
            return res;
219
0
    }
220
221
    /* Disable (and disconnect) and set the deleteFlag. This prevents a
222
     * reconnect and triggers the deletion when the last open socket is
223
     * closed. */
224
0
    rg->deleteFlag = true;
225
0
    UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_DISABLED);
226
227
0
    UA_DataSetReader *dsr, *tmp_dsr;
228
0
    LIST_FOREACH_SAFE(dsr, &rg->readers, listEntry, tmp_dsr) {
229
0
        UA_DataSetReader_remove(psm, dsr);
230
0
    }
231
232
0
    if(rg->config.securityPolicy && rg->securityPolicyContext) {
233
0
        UA_PubSubSecurityPolicy *sp = rg->config.securityPolicy;
234
0
        sp->deleteGroupContext(sp, rg->securityPolicyContext);
235
0
        rg->securityPolicyContext = NULL;
236
0
    }
237
238
#ifdef UA_ENABLE_PUBSUB_SKS
239
    if(rg->keyStorage) {
240
        UA_PubSubKeyStorage_detachKeyStorage(psm, rg->keyStorage);
241
        rg->keyStorage = NULL;
242
    }
243
#endif
244
245
0
    if(rg->recvChannelsSize == 0) {
246
        /* Unlink from the connection */
247
0
        LIST_REMOVE(rg, listEntry);
248
0
        connection->readerGroupsSize--;
249
0
        rg->linkedConnection = NULL;
250
251
        /* Actually remove the ReaderGroup */
252
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
253
0
        deleteNode(psm->sc.server, rg->head.identifier, true);
254
0
#endif
255
256
0
        UA_LOG_INFO_PUBSUB(psm->logging, rg, "ReaderGroup deleted");
257
258
0
        UA_ReaderGroupConfig_clear(&rg->config);
259
0
        UA_PubSubComponentHead_clear(&rg->head);
260
0
        UA_free(rg);
261
0
    }
262
263
    /* Update the connection state */
264
0
    UA_PubSubConnection_setPubSubState(psm, connection, connection->head.state);
265
266
0
    return UA_STATUSCODE_GOOD;
267
0
}
268
269
UA_StatusCode
270
UA_ReaderGroup_setPubSubState(UA_PubSubManager *psm, UA_ReaderGroup *rg,
271
0
                              UA_PubSubState targetState) {
272
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
273
274
0
    if(rg->deleteFlag && targetState != UA_PUBSUBSTATE_DISABLED) {
275
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
276
0
                              "The ReaderGroup is being deleted. Can only be disabled.");
277
0
        return UA_STATUSCODE_BADINTERNALERROR;
278
0
    }
279
280
    /* Callback to modify the WriterGroup config and change the targetState
281
     * before the state machine executes */
282
0
    UA_Server *server = psm->sc.server;
283
0
    if(server->config.pubSubConfig.beforeStateChangeCallback) {
284
0
        server->config.pubSubConfig.
285
0
            beforeStateChangeCallback(server, rg->head.identifier, &targetState);
286
0
    }
287
288
0
    UA_StatusCode ret = UA_STATUSCODE_GOOD;
289
0
    UA_PubSubState oldState = rg->head.state;
290
0
    UA_PubSubConnection *connection = rg->linkedConnection;
291
292
    /* Are we doing a top-level state update or recursively? */
293
0
    UA_Boolean isTransient = rg->head.transientState;
294
0
    rg->head.transientState = true;
295
296
    /* Custom state machine */
297
0
    if(rg->config.customStateMachine) {
298
0
        ret = rg->config.customStateMachine(server, rg->head.identifier, rg->config.context,
299
0
                                            &rg->head.state, targetState);
300
0
        goto finalize_state_machine;
301
0
    }
302
303
    /* Internal state machine */
304
0
    switch(targetState) {
305
        /* Disabled or Error */
306
0
    case UA_PUBSUBSTATE_DISABLED:
307
0
    case UA_PUBSUBSTATE_ERROR:
308
0
        rg->head.state = targetState;
309
0
        UA_ReaderGroup_disconnect(rg);
310
0
        rg->hasReceived = false;
311
0
        break;
312
313
        /* Enabled */
314
0
    case UA_PUBSUBSTATE_PAUSED:
315
0
    case UA_PUBSUBSTATE_PREOPERATIONAL:
316
0
    case UA_PUBSUBSTATE_OPERATIONAL:
317
0
        if(psm->sc.state != UA_LIFECYCLESTATE_STARTED) {
318
            /* Avoid repeat warnings */
319
0
            if(oldState != UA_PUBSUBSTATE_PAUSED) {
320
0
                UA_LOG_WARNING_PUBSUB(psm->logging, rg,
321
0
                                      "Cannot enable the ReaderGroup while the "
322
0
                                      "server is not running -> Paused State");
323
0
            }
324
0
            rg->head.state = UA_PUBSUBSTATE_PAUSED;
325
0
            UA_ReaderGroup_disconnect(rg);
326
0
            break;
327
0
        }
328
329
        /* Connection is not operational -> ReaderGroup paused */
330
0
        if(connection->head.state != UA_PUBSUBSTATE_OPERATIONAL) {
331
0
            UA_ReaderGroup_disconnect(rg);
332
0
            rg->head.state = UA_PUBSUBSTATE_PAUSED;
333
0
            break;
334
0
        }
335
336
        /* Connect RG-specific connections. For example for MQTT. */
337
0
        if(UA_ReaderGroup_canConnect(rg))
338
0
            ret = UA_ReaderGroup_connect(psm, rg, false);
339
340
        /* Preoperational until a message was received */
341
0
        rg->head.state = (rg->hasReceived) ?
342
0
            UA_PUBSUBSTATE_OPERATIONAL : UA_PUBSUBSTATE_PREOPERATIONAL;
343
0
        break;
344
345
        /* Unknown case */
346
0
    default:
347
0
        ret = UA_STATUSCODE_BADINTERNALERROR;
348
0
        break;
349
0
    }
350
351
    /* Failure */
352
0
    if(ret != UA_STATUSCODE_GOOD) {
353
0
        rg->head.state = UA_PUBSUBSTATE_ERROR;
354
0
        UA_ReaderGroup_disconnect(rg);
355
0
        rg->hasReceived = false;
356
0
    }
357
358
0
 finalize_state_machine:
359
360
    /* Only the top-level state update (if recursive calls are happening)
361
     * notifies the application and updates Reader and WriterGroups */
362
0
    rg->head.transientState = isTransient;
363
0
    if(rg->head.transientState)
364
0
        return ret;
365
366
    /* No state change has happened */
367
0
    if(rg->head.state == oldState)
368
0
        return ret;
369
370
0
    UA_LOG_INFO_PUBSUB(psm->logging, rg, "%s -> %s",
371
0
                       UA_PubSubState_name(oldState),
372
0
                       UA_PubSubState_name(rg->head.state));
373
374
    /* Inform application about state change */
375
0
    if(server->config.pubSubConfig.stateChangeCallback)
376
0
        server->config.pubSubConfig.
377
0
            stateChangeCallback(server, rg->head.identifier, rg->head.state, ret);
378
379
    /* Children evaluate their state machine after the state change of the parent.
380
     * Keep the current child state as the target state for the child. */
381
0
    UA_DataSetReader *dsr;
382
0
    LIST_FOREACH(dsr, &rg->readers, listEntry) {
383
0
        if(psm->pubSubInitialSetupMode && dsr->config.enabled) {
384
0
            UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_PREOPERATIONAL, UA_STATUSCODE_GOOD);
385
0
        } else {
386
0
            UA_DataSetReader_setPubSubState(psm, dsr, dsr->head.state, UA_STATUSCODE_GOOD);
387
0
        }
388
0
    }
389
390
    /* Update the PubSubManager state. It will go from STOPPING to STOPPED when
391
     * the last socket has closed. */
392
0
    UA_PubSubManager_setState(psm, psm->sc.state);
393
394
0
    return ret;
395
0
}
396
397
UA_StatusCode
398
UA_ReaderGroup_setEncryptionKeys(UA_PubSubManager *psm, UA_ReaderGroup *rg,
399
                                 UA_UInt32 securityTokenId,
400
                                 const UA_ByteString signingKey,
401
                                 const UA_ByteString encryptingKey,
402
0
                                 const UA_ByteString keyNonce) {
403
0
    if(rg->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON) {
404
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
405
0
                              "JSON encoding is enabled. The message security is "
406
0
                              "only defined for the UADP message mapping.");
407
0
        return UA_STATUSCODE_BADINTERNALERROR;
408
0
    }
409
410
0
    UA_PubSubSecurityPolicy *sp = rg->config.securityPolicy;
411
0
    if(!sp) {
412
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
413
0
                              "No SecurityPolicy configured for the ReaderGroup");
414
0
        return UA_STATUSCODE_BADINTERNALERROR;
415
0
    }
416
417
0
    if(securityTokenId != rg->securityTokenId) {
418
0
        rg->securityTokenId = securityTokenId;
419
0
        rg->nonceSequenceNumber = 1;
420
0
    }
421
422
    /* Create a new context */
423
0
    if(!rg->securityPolicyContext) {
424
0
        return sp->newGroupContext(sp, &signingKey, &encryptingKey, &keyNonce,
425
0
                                   &rg->securityPolicyContext);
426
0
    }
427
428
    /* Update the context */
429
0
    return sp->setSecurityKeys(sp, rg->securityPolicyContext, &signingKey,
430
0
                               &encryptingKey, &keyNonce);
431
0
}
432
433
UA_Boolean
434
UA_ReaderGroup_process(UA_PubSubManager *psm, UA_ReaderGroup *rg,
435
0
                       UA_NetworkMessage *nm) {
436
    /* Check if the ReaderGroup is enabled */
437
0
    if(rg->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
438
0
       rg->head.state != UA_PUBSUBSTATE_PREOPERATIONAL)
439
0
        return false;
440
441
    /* Set to operational if required */
442
0
    rg->hasReceived = true;
443
0
    UA_ReaderGroup_setPubSubState(psm, rg, rg->head.state);
444
445
    /* Safe iteration. The current Reader might be deleted in the ReaderGroup
446
     * _setPubSubState callback. */
447
0
    UA_Boolean processed = false;
448
0
    UA_DataSetReader *reader, *reader_tmp;
449
0
    LIST_FOREACH_SAFE(reader, &rg->readers, listEntry, reader_tmp) {
450
        /* Check if the reader is enabled */
451
0
        if(reader->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
452
0
           reader->head.state != UA_PUBSUBSTATE_PREOPERATIONAL)
453
0
            continue;
454
455
0
        UA_StatusCode res = UA_DataSetReader_checkIdentifier(psm, reader, nm);
456
0
        if(res != UA_STATUSCODE_GOOD)
457
0
            continue;
458
459
        /* Update the ReaderGroup state if this is the first received message */
460
0
        if(!rg->hasReceived) {
461
0
            rg->hasReceived = true;
462
0
            UA_ReaderGroup_setPubSubState(psm, rg, rg->head.state);
463
0
        }
464
465
        /* The message was processed by at least one reader */
466
0
        processed = true;
467
468
0
        UA_LOG_TRACE_PUBSUB(psm->logging, rg, "Processing a NetworkMessage");
469
470
        /* No payload header. The message ontains a single DataSetMessage that
471
         * is processed by every Reader. */
472
0
        if(!nm->payloadHeaderEnabled) {
473
0
            UA_DataSetReader_process(psm, reader, nm->payload.dataSetMessages);
474
0
            continue;
475
0
        }
476
477
        /* Process only the payloads where the WriterId from the header is expected */
478
0
        for(size_t i = 0; i < nm->messageCount; i++) {
479
0
            if(reader->config.dataSetWriterId == nm->dataSetWriterIds[i])
480
0
                UA_DataSetReader_process(psm, reader, &nm->payload.dataSetMessages[i]);
481
0
        }
482
0
    }
483
484
0
    return processed;
485
0
}
486
487
UA_StatusCode
488
UA_ReaderGroup_decodeNetworkMessage(UA_PubSubManager *psm,
489
                                    UA_ReaderGroup *rg,
490
                                    UA_ByteString buffer,
491
0
                                    UA_NetworkMessage *nm) {
492
    /* Set up the decoding context */
493
0
    PubSubDecodeCtx ctx;
494
0
    memset(&ctx, 0, sizeof(PubSubDecodeCtx));
495
0
    ctx.ctx.pos = buffer.data;
496
0
    ctx.ctx.end = buffer.data + buffer.length;
497
0
    ctx.ctx.opts.customTypes = psm->sc.server->config.customDataTypes;
498
499
    /* Decode the headers. This sets the number of DataSetMessages and retrieves
500
     * the DataSetWriterIds. Those get matched to the readers below. */
501
0
    UA_StatusCode rv = UA_NetworkMessage_decodeHeaders(&ctx, nm);
502
0
    if(rv != UA_STATUSCODE_GOOD) {
503
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
504
0
                              "PubSub receive. decoding headers failed");
505
0
        UA_NetworkMessage_clear(nm);
506
0
        return rv;
507
0
    }
508
509
    /* Find a matching reader. Otherwise skip for this ReaderGroup */
510
0
    UA_DataSetReader *dsr;
511
0
    LIST_FOREACH(dsr, &rg->readers, listEntry) {
512
0
        rv = UA_DataSetReader_checkIdentifier(psm, dsr, nm);
513
0
        if(rv == UA_STATUSCODE_GOOD)
514
0
            break;
515
0
    }
516
0
    if(!dsr) {
517
0
        UA_NetworkMessage_clear(nm);
518
0
        return UA_STATUSCODE_BADNOTFOUND;
519
0
    }
520
521
    /* Prepare the metadata with information from the readers to decode the
522
     * DataSetMessages */
523
0
    size_t i = 0;
524
0
    UA_STACKARRAY(UA_DataSetMessage_EncodingMetaData, emd, rg->readersCount);
525
0
    memset(emd, 0, sizeof(UA_DataSetMessage_EncodingMetaData) * rg->readersCount);
526
0
    ctx.eo.metaData = emd;
527
0
    ctx.eo.metaDataSize = rg->readersCount;
528
0
    LIST_FOREACH(dsr, &rg->readers, listEntry) {
529
0
        emd[i].dataSetWriterId = dsr->config.dataSetWriterId;
530
0
        emd[i].fields = dsr->config.dataSetMetaData.fields;
531
0
        emd[i].fieldsSize = dsr->config.dataSetMetaData.fieldsSize;
532
0
        i++;
533
0
    }
534
535
    /* Handle missing payload header and "inject" metadata */
536
0
    if(!nm->payloadHeaderEnabled)
537
0
        UA_NetworkMessage_makeSyntheticPayloadHeader(&ctx.eo, nm);
538
539
    /* Decrypt */
540
0
    rv = verifyAndDecryptNetworkMessage(psm->logging, buffer, &ctx.ctx, nm, rg);
541
0
    if(rv != UA_STATUSCODE_GOOD) {
542
0
        UA_NetworkMessage_clear(nm);
543
0
        return rv;
544
0
    }
545
546
    /* Decode the payload */
547
0
    rv = UA_NetworkMessage_decodePayload(&ctx, nm);
548
0
    if(rv != UA_STATUSCODE_GOOD) {
549
0
        UA_NetworkMessage_clear(nm);
550
0
        return rv;
551
0
    }
552
553
0
    rv = UA_NetworkMessage_decodeFooters(&ctx, nm);
554
0
    if(rv != UA_STATUSCODE_GOOD) {
555
0
        UA_NetworkMessage_clear(nm);
556
0
        return rv;
557
0
    }
558
559
0
    return UA_STATUSCODE_GOOD;
560
0
}
561
562
#ifdef UA_ENABLE_JSON_ENCODING
563
UA_StatusCode
564
UA_ReaderGroup_decodeNetworkMessageJSON(UA_PubSubManager *psm,
565
                                        UA_ReaderGroup *rg,
566
                                        UA_ByteString buffer,
567
0
                                        UA_NetworkMessage *nm) {
568
    /* Set up the decoding options */
569
0
    UA_DecodeJsonOptions jo;
570
0
    memset(&jo, 0, sizeof(jo));
571
0
    jo.customTypes = psm->sc.server->config.customDataTypes;
572
573
    /* Prepare the metadata with information from the readers to decode the
574
     * DataSetMessages */
575
0
    UA_NetworkMessage_EncodingOptions eo;
576
0
    size_t i = 0;
577
0
    UA_STACKARRAY(UA_DataSetMessage_EncodingMetaData, emd, rg->readersCount);
578
0
    memset(emd, 0, sizeof(UA_DataSetMessage_EncodingMetaData) * rg->readersCount);
579
0
    eo.metaData = emd;
580
0
    eo.metaDataSize = rg->readersCount;
581
582
0
    UA_DataSetReader *dsr;
583
0
    LIST_FOREACH(dsr, &rg->readers, listEntry) {
584
0
        emd[i].dataSetWriterId = dsr->config.dataSetWriterId;
585
0
        emd[i].fields = dsr->config.dataSetMetaData.fields;
586
0
        emd[i].fieldsSize = dsr->config.dataSetMetaData.fieldsSize;
587
0
        i++;
588
0
    }
589
590
    /* Decode */
591
0
    return UA_NetworkMessage_decodeJson(&buffer, nm, &eo, &jo);
592
0
}
593
#endif
594
595
/******************************/
596
/* Decrypt the NetworkMessage */
597
/******************************/
598
599
static UA_StatusCode
600
needsDecryption(const UA_Logger *logger,
601
                const UA_NetworkMessage *networkMessage,
602
                const UA_MessageSecurityMode securityMode,
603
0
                UA_Boolean *doDecrypt) {
604
0
    UA_StatusCode retval = UA_STATUSCODE_GOOD;
605
0
    UA_Boolean requiresEncryption = securityMode > UA_MESSAGESECURITYMODE_SIGN;
606
0
    UA_Boolean isEncrypted = networkMessage->securityHeader.networkMessageEncrypted;
607
608
0
    if(isEncrypted && requiresEncryption) {
609
0
        *doDecrypt = true;
610
0
    } else if(!isEncrypted && !requiresEncryption) {
611
0
        *doDecrypt = false;
612
0
    } else {
613
0
        if(isEncrypted) {
614
0
            UA_LOG_ERROR(logger, UA_LOGCATEGORY_SECURITYPOLICY,
615
0
                         "PubSub receive. "
616
0
                         "Message is encrypted but ReaderGroup does not expect encryption");
617
0
            retval = UA_STATUSCODE_BADSECURITYMODEINSUFFICIENT;
618
0
        } else {
619
0
            UA_LOG_ERROR(logger, UA_LOGCATEGORY_SECURITYPOLICY,
620
0
                         "PubSub receive. "
621
0
                         "Message is not encrypted but ReaderGroup requires encryption");
622
0
            retval = UA_STATUSCODE_BADSECURITYMODEREJECTED;
623
0
        }
624
0
    }
625
0
    return retval;
626
0
}
627
628
static UA_StatusCode
629
needsValidation(const UA_Logger *logger,
630
                const UA_NetworkMessage *networkMessage,
631
                const UA_MessageSecurityMode securityMode,
632
0
                UA_Boolean *doValidate) {
633
0
    UA_StatusCode retval = UA_STATUSCODE_GOOD;
634
0
    UA_Boolean isSigned = networkMessage->securityHeader.networkMessageSigned;
635
0
    UA_Boolean requiresSignature = securityMode > UA_MESSAGESECURITYMODE_NONE;
636
637
0
    if(isSigned &&
638
0
       requiresSignature) {
639
0
        *doValidate = true;
640
0
    } else if(!isSigned && !requiresSignature) {
641
0
        *doValidate = false;
642
0
    } else {
643
0
        if(isSigned) {
644
0
            UA_LOG_ERROR(logger, UA_LOGCATEGORY_SECURITYPOLICY,
645
0
                         "PubSub receive. "
646
0
                         "Message is signed but ReaderGroup does not expect signatures");
647
0
            retval = UA_STATUSCODE_BADSECURITYMODEINSUFFICIENT;
648
0
        } else {
649
0
            UA_LOG_ERROR(logger, UA_LOGCATEGORY_SECURITYPOLICY,
650
0
                         "PubSub receive. "
651
0
                         "Message is not signed but ReaderGroup requires signature");
652
0
            retval = UA_STATUSCODE_BADSECURITYMODEREJECTED;
653
0
        }
654
0
    }
655
0
    return retval;
656
0
}
657
658
UA_StatusCode
659
verifyAndDecryptNetworkMessage(const UA_Logger *logger, UA_ByteString buffer,
660
0
                               Ctx *ctx, UA_NetworkMessage *nm, UA_ReaderGroup *rg) {
661
0
    UA_MessageSecurityMode securityMode = rg->config.securityMode;
662
0
    UA_Boolean doValidate = false;
663
0
    UA_Boolean doDecrypt = false;
664
665
0
    UA_StatusCode rv = needsValidation(logger, nm, securityMode, &doValidate);
666
0
    UA_CHECK_STATUS_WARN(rv, return rv, logger, UA_LOGCATEGORY_SECURITYPOLICY,
667
0
                         "PubSub receive. Validation security mode error");
668
669
0
    rv = needsDecryption(logger, nm, securityMode, &doDecrypt);
670
0
    UA_CHECK_STATUS_WARN(rv, return rv, logger, UA_LOGCATEGORY_SECURITYPOLICY,
671
0
                         "PubSub receive. Decryption security mode error");
672
673
0
    if(!doValidate && !doDecrypt)
674
0
        return UA_STATUSCODE_GOOD;
675
676
0
    UA_PubSubSecurityPolicy *sp = rg->config.securityPolicy;
677
0
    UA_CHECK_MEM_ERROR(sp, return UA_STATUSCODE_BADINVALIDARGUMENT,
678
0
                       logger, UA_LOGCATEGORY_PUBSUB,
679
0
                       "PubSub receive. securityPolicy must be set when security mode"
680
0
                       "is enabled to sign and/or encrypt");
681
682
0
    void *cc = rg->securityPolicyContext;
683
0
    UA_CHECK_MEM_ERROR(cc, return UA_STATUSCODE_BADINVALIDARGUMENT,
684
0
                       logger, UA_LOGCATEGORY_PUBSUB,
685
0
                       "PubSub receive. securityPolicyContext must be initialized "
686
0
                       "when security mode is enabled to sign and/or encrypt");
687
688
    /* Validate the signature */
689
0
    if(doValidate) {
690
0
        size_t sigSize = sp->getSignatureSize(sp, cc);
691
0
        UA_ByteString toBeVerified = {buffer.length - sigSize, buffer.data};
692
0
        UA_ByteString signature = {sigSize, buffer.data + buffer.length - sigSize};
693
694
0
        rv = sp->verify(sp, cc, &toBeVerified, &signature);
695
0
        UA_CHECK_STATUS_WARN(rv, return rv, logger, UA_LOGCATEGORY_SECURITYPOLICY,
696
0
                             "PubSub receive. Signature invalid");
697
698
        /* Remove the signature from the ctx->end. We do not want to decode that. */
699
0
        ctx->end -= sigSize;
700
0
    }
701
702
    /* Decrypt the content */
703
0
    if(doDecrypt) {
704
0
        const UA_ByteString nonce = {
705
0
            (size_t)nm->securityHeader.messageNonceSize,
706
0
            (UA_Byte*)(uintptr_t)nm->securityHeader.messageNonce
707
0
        };
708
0
        rv = sp->setMessageNonce(sp, cc, &nonce);
709
0
        UA_CHECK_STATUS_WARN(rv, return rv, logger, UA_LOGCATEGORY_SECURITYPOLICY,
710
0
                             "PubSub receive. Faulty Nonce set");
711
712
0
        UA_ByteString toBeDecrypted = {(uintptr_t)(ctx->end - ctx->pos), ctx->pos};
713
0
        rv = sp->decrypt(sp, cc, &toBeDecrypted);
714
0
        UA_CHECK_STATUS_WARN(rv, return rv, logger, UA_LOGCATEGORY_SECURITYPOLICY,
715
0
                             "PubSub receive. Faulty Decryption");
716
0
    }
717
718
0
    return UA_STATUSCODE_GOOD;
719
0
}
720
721
/***********************/
722
/* Connection Handling */
723
/***********************/
724
725
static UA_StatusCode
726
UA_ReaderGroup_connectMQTT(UA_PubSubManager *psm, UA_ReaderGroup *rg,
727
                           UA_Boolean validate);
728
729
typedef struct  {
730
    UA_String profileURI;
731
    UA_String protocol;
732
    UA_Boolean json;
733
    UA_StatusCode (*connectReaderGroup)(UA_PubSubManager *psm, UA_ReaderGroup *rg,
734
                                        UA_Boolean validate);
735
} ReaderGroupProfileMapping;
736
737
static ReaderGroupProfileMapping readerGroupProfiles[UA_PUBSUB_PROFILES_SIZE] = {
738
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp"),
739
     UA_STRING_STATIC("udp"), false, NULL},
740
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp"),
741
     UA_STRING_STATIC("mqtt"), false, UA_ReaderGroup_connectMQTT},
742
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"),
743
     UA_STRING_STATIC("mqtt"), true, UA_ReaderGroup_connectMQTT},
744
    {UA_STRING_STATIC("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp"),
745
     UA_STRING_STATIC("eth"), false, NULL}
746
};
747
748
static void
749
UA_ReaderGroup_detachConnection(UA_PubSubManager *psm, UA_ReaderGroup *rg,
750
0
                                UA_ConnectionManager *cm, uintptr_t connectionId) {
751
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
752
0
        if(rg->recvChannels[i] != connectionId)
753
0
            continue;
754
0
        UA_LOG_INFO_PUBSUB(psm->logging, rg, "Detach receive-connection %S %u",
755
0
                           cm->protocol, (unsigned)connectionId);
756
0
        rg->recvChannels[i] = 0;
757
0
        rg->recvChannelsSize--;
758
0
        return;
759
0
    }
760
0
}
761
762
static UA_StatusCode
763
UA_ReaderGroup_attachRecvConnection(UA_PubSubManager *psm, UA_ReaderGroup *rg,
764
0
                                    UA_ConnectionManager *cm, uintptr_t connectionId) {
765
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
766
0
        if(rg->recvChannels[i] == connectionId)
767
0
            return UA_STATUSCODE_GOOD;
768
0
    }
769
0
    if(rg->recvChannelsSize >= UA_PUBSUB_MAXCHANNELS)
770
0
        return UA_STATUSCODE_BADINTERNALERROR;
771
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
772
0
        if(rg->recvChannels[i] != 0)
773
0
            continue;
774
0
        UA_LOG_INFO_PUBSUB(psm->logging, rg, "Attach receive-connection %S %u",
775
0
                           cm->protocol, (unsigned)connectionId);
776
0
        rg->recvChannels[i] = connectionId;
777
0
        rg->recvChannelsSize++;
778
0
        break;
779
0
    }
780
0
    return UA_STATUSCODE_GOOD;
781
0
}
782
783
static void
784
ReaderGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId,
785
                          void *application, void **connectionContext,
786
                          UA_ConnectionState state, const UA_KeyValueMap *params,
787
0
                          UA_ByteString msg) {
788
0
    if(!connectionContext)
789
0
        return;
790
791
    /* Get the context pointers */
792
0
    UA_ReaderGroup *rg = (UA_ReaderGroup*)*connectionContext;
793
0
    UA_PubSubManager *psm = (UA_PubSubManager*)application;
794
0
    UA_Server *server = psm->sc.server;
795
796
0
    lockServer(server);
797
798
    /* The connection is closing in the EventLoop. This is the last callback
799
     * from that connection. Clean up the SecureChannel in the client. */
800
0
    if(state == UA_CONNECTIONSTATE_CLOSING) {
801
        /* Reset the connection identifiers */
802
0
        UA_ReaderGroup_detachConnection(psm, rg, cm, connectionId);
803
804
        /* PSC marked for deletion and the last EventLoop connection has closed */
805
0
        if(rg->deleteFlag && rg->recvChannelsSize == 0) {
806
0
            UA_ReaderGroup_remove(psm, rg);
807
0
            unlockServer(server);
808
0
            return;
809
0
        }
810
811
        /* Reconnect if still operational */
812
0
        UA_ReaderGroup_setPubSubState(psm, rg, rg->head.state);
813
814
        /* Switch the psm state from stopping to stopped once the last
815
         * connection has closed */
816
0
        UA_PubSubManager_setState(psm, psm->sc.state);
817
818
0
        unlockServer(server);
819
0
        return;
820
0
    }
821
822
    /* Store the connectionId (if a new connection) */
823
0
    UA_StatusCode res = UA_ReaderGroup_attachRecvConnection(psm, rg, cm, connectionId);
824
0
    if(res != UA_STATUSCODE_GOOD) {
825
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
826
0
                              "No more space for an additional EventLoop connection");
827
0
        UA_PubSubConnection *c = rg->linkedConnection;
828
0
        if(c && c->cm)
829
0
            c->cm->closeConnection(c->cm, connectionId);
830
0
        unlockServer(server);
831
0
        return;
832
0
    }
833
834
    /* The connection has opened - set the ReaderGroup to operational */
835
0
    UA_ReaderGroup_setPubSubState(psm, rg, rg->head.state);
836
837
    /* No message received */
838
0
    if(msg.length == 0) {
839
0
        unlockServer(server);
840
0
        return;
841
0
    }
842
843
0
    if(rg->head.state != UA_PUBSUBSTATE_OPERATIONAL) {
844
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
845
0
                              "Received a messaage for a non-operational ReaderGroup");
846
0
        unlockServer(server);
847
0
        return;
848
0
    }
849
850
    /* Decode message */
851
0
    UA_NetworkMessage nm;
852
0
    memset(&nm, 0, sizeof(UA_NetworkMessage));
853
0
    if(rg->config.encodingMimeType == UA_PUBSUB_ENCODING_UADP) {
854
0
        res = UA_ReaderGroup_decodeNetworkMessage(psm, rg, msg, &nm);
855
0
    } else { /* if(writerGroup->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON) */
856
0
#ifdef UA_ENABLE_JSON_ENCODING
857
0
        res = UA_ReaderGroup_decodeNetworkMessageJSON(psm, rg, msg, &nm);
858
#else
859
        res = UA_STATUSCODE_BADNOTSUPPORTED;
860
#endif
861
0
    }
862
0
    if(res != UA_STATUSCODE_GOOD) {
863
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
864
0
                              "Verify, decrypt and decode network message failed");
865
0
        unlockServer(server);
866
0
        return;
867
0
    }
868
869
    /* Process the decoded message */
870
0
    UA_ReaderGroup_process(psm, rg, &nm);
871
0
    UA_NetworkMessage_clear(&nm);
872
0
    unlockServer(server);
873
0
}
874
875
static UA_StatusCode
876
UA_ReaderGroup_connectMQTT(UA_PubSubManager *psm, UA_ReaderGroup *rg,
877
0
                           UA_Boolean validate) {
878
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
879
880
0
    UA_PubSubConnection *c = rg->linkedConnection;
881
0
    UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*)
882
0
        c->config.address.data;
883
884
    /* Get the TransportSettings */
885
0
    UA_ExtensionObject *ts = &rg->config.transportSettings;
886
0
    if((ts->encoding != UA_EXTENSIONOBJECT_DECODED &&
887
0
        ts->encoding != UA_EXTENSIONOBJECT_DECODED_NODELETE) ||
888
0
       ts->content.decoded.type !=
889
0
       &UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE]) {
890
0
        UA_LOG_ERROR_PUBSUB(psm->logging, rg,
891
0
                            "Wrong TransportSettings type for MQTT");
892
0
        return UA_STATUSCODE_BADINTERNALERROR;
893
0
    }
894
0
    UA_BrokerDataSetReaderTransportDataType *transportSettings =
895
0
        (UA_BrokerDataSetReaderTransportDataType*)ts->content.decoded.data;
896
897
    /* Extract hostname and port */
898
0
    UA_String address;
899
0
    UA_UInt16 port = 1883; /* Default */
900
0
    UA_StatusCode res = UA_parseEndpointUrl(&addressUrl->url, &address, &port, NULL);
901
0
    if(res != UA_STATUSCODE_GOOD) {
902
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c, "Could not parse the MQTT network URL");
903
0
        return res;
904
0
    }
905
906
    /* Set up the connection parameters.
907
     * TODO: Complete the MQTT parameters. */
908
0
    UA_Boolean listen = true;
909
0
    UA_KeyValuePair kvp[5];
910
0
    UA_KeyValueMap kvm = {5, kvp};
911
0
    kvp[0].key = UA_QUALIFIEDNAME(0, "address");
912
0
    UA_Variant_setScalar(&kvp[0].value, &address, &UA_TYPES[UA_TYPES_STRING]);
913
0
    kvp[1].key = UA_QUALIFIEDNAME(0, "subscribe");
914
0
    UA_Variant_setScalar(&kvp[1].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]);
915
0
    kvp[2].key = UA_QUALIFIEDNAME(0, "port");
916
0
    UA_Variant_setScalar(&kvp[2].value, &port, &UA_TYPES[UA_TYPES_UINT16]);
917
0
    kvp[3].key = UA_QUALIFIEDNAME(0, "topic");
918
0
    UA_Variant_setScalar(&kvp[3].value, &transportSettings->queueName,
919
0
                         &UA_TYPES[UA_TYPES_STRING]);
920
0
    kvp[4].key = UA_QUALIFIEDNAME(0, "validate");
921
0
    UA_Variant_setScalar(&kvp[4].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]);
922
923
    /* Connect */
924
0
    res = c->cm->openConnection(c->cm, &kvm, psm, rg, ReaderGroupChannelCallback);
925
0
    if(res != UA_STATUSCODE_GOOD) {
926
0
        UA_LOG_ERROR_PUBSUB(psm->logging, rg, "Could not open the MQTT connection");
927
0
    }
928
0
    return res;
929
0
}
930
931
void
932
0
UA_ReaderGroup_disconnect(UA_ReaderGroup *rg) {
933
0
    UA_PubSubConnection *c = rg->linkedConnection;
934
0
    if(!c)
935
0
        return;
936
0
    for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) {
937
0
        if(rg->recvChannels[i] != 0)
938
0
            c->cm->closeConnection(c->cm, rg->recvChannels[i]);
939
0
    }
940
0
}
941
942
UA_Boolean
943
0
UA_ReaderGroup_canConnect(UA_ReaderGroup *rg) {
944
0
    return rg->recvChannelsSize == 0;
945
0
}
946
947
UA_StatusCode
948
0
UA_ReaderGroup_connect(UA_PubSubManager *psm, UA_ReaderGroup *rg, UA_Boolean validate) {
949
0
    UA_Server *server = psm->sc.server;
950
0
    UA_LOCK_ASSERT(&server->serviceMutex);
951
952
    /* Is this a ReaderGroup with custom TransportSettings beyond the
953
     * PubSubConnection? */
954
0
    if(rg->config.transportSettings.encoding == UA_EXTENSIONOBJECT_ENCODED_NOBODY)
955
0
        return UA_STATUSCODE_GOOD;
956
957
0
    UA_EventLoop *el = psm->sc.server->config.eventLoop;
958
0
    if(!el) {
959
0
        UA_LOG_ERROR_PUBSUB(server->config.logging, rg, "No EventLoop configured");
960
0
        return UA_STATUSCODE_BADINTERNALERROR;
961
0
    }
962
963
0
    UA_PubSubConnection *c = rg->linkedConnection;
964
0
    if(!c)
965
0
        return UA_STATUSCODE_BADINTERNALERROR;
966
967
    /* Look up the connection manager for the connection */
968
0
    ReaderGroupProfileMapping *profile = NULL;
969
0
    for(size_t i = 0; i < UA_PUBSUB_PROFILES_SIZE; i++) {
970
0
        if(!UA_String_equal(&c->config.transportProfileUri,
971
0
                            &readerGroupProfiles[i].profileURI))
972
0
            continue;
973
0
        profile = &readerGroupProfiles[i];
974
0
        break;
975
0
    }
976
977
0
    UA_ConnectionManager *cm = (profile) ? getCM(el, profile->protocol) : NULL;
978
0
    if(!cm || (c->cm && cm != c->cm)) {
979
0
        UA_LOG_ERROR_PUBSUB(psm->logging, c,
980
0
                            "The requested profile \"%S\"is not supported",
981
0
                            c->config.transportProfileUri);
982
0
        return UA_STATUSCODE_BADINTERNALERROR;
983
0
    }
984
985
0
    c->cm = cm;
986
0
    c->json = profile->json;
987
988
    /* If no ReaderGroup-specific connections, the ReaderGroup is set to
989
     * operational when we return. */
990
0
    return (profile->connectReaderGroup) ?
991
0
        profile->connectReaderGroup(psm, rg, validate) : UA_STATUSCODE_GOOD;
992
0
}
993
994
/**************/
995
/* Server API */
996
/**************/
997
998
UA_StatusCode
999
UA_Server_addReaderGroup(UA_Server *server, const UA_NodeId connectionIdentifier,
1000
                         const UA_ReaderGroupConfig *readerGroupConfig,
1001
0
                         UA_NodeId *readerGroupIdentifier) {
1002
0
    if(!server || !readerGroupConfig)
1003
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1004
0
    lockServer(server);
1005
0
    UA_PubSubManager *psm = getPSM(server);
1006
0
    UA_StatusCode res =
1007
0
        UA_ReaderGroup_create(psm, connectionIdentifier,
1008
0
                              readerGroupConfig, readerGroupIdentifier);
1009
0
    unlockServer(server);
1010
0
    return res;
1011
0
}
1012
1013
UA_StatusCode
1014
0
UA_Server_removeReaderGroup(UA_Server *server, const UA_NodeId groupIdentifier) {
1015
0
    if(!server)
1016
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1017
0
    lockServer(server);
1018
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
1019
0
    UA_PubSubManager *psm = getPSM(server);
1020
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(psm, groupIdentifier);
1021
0
    if(rg)
1022
0
        UA_ReaderGroup_remove(psm, rg);
1023
0
    else
1024
0
        res = UA_STATUSCODE_BADNOTFOUND;
1025
0
    unlockServer(server);
1026
0
    return res;
1027
0
}
1028
1029
UA_StatusCode
1030
UA_Server_getReaderGroupConfig(UA_Server *server, const UA_NodeId rgId,
1031
0
                               UA_ReaderGroupConfig *config) {
1032
0
    if(!server || !config)
1033
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1034
0
    lockServer(server);
1035
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(getPSM(server), rgId);
1036
0
    UA_StatusCode ret = (rg) ?
1037
0
        UA_ReaderGroupConfig_copy(&rg->config, config) : UA_STATUSCODE_BADNOTFOUND;
1038
0
    unlockServer(server);
1039
0
    return ret;
1040
0
}
1041
1042
UA_StatusCode
1043
UA_Server_getReaderGroupState(UA_Server *server, const UA_NodeId rgId,
1044
0
                              UA_PubSubState *state) {
1045
0
    if(!server || !state)
1046
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1047
0
    lockServer(server);
1048
0
    UA_StatusCode ret = UA_STATUSCODE_BADNOTFOUND;
1049
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(getPSM(server), rgId);
1050
0
    if(rg) {
1051
0
        *state = rg->head.state;
1052
0
        ret = UA_STATUSCODE_GOOD;
1053
0
    }
1054
0
    unlockServer(server);
1055
0
    return ret;
1056
0
}
1057
1058
#ifdef UA_ENABLE_PUBSUB_SKS
1059
UA_StatusCode
1060
UA_Server_setReaderGroupActivateKey(UA_Server *server,
1061
                                    const UA_NodeId readerGroupId) {
1062
    if(!server)
1063
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1064
    lockServer(server);
1065
    UA_PubSubManager *psm = getPSM(server);
1066
    UA_ReaderGroup *rg = UA_ReaderGroup_find(psm, readerGroupId);
1067
    if(!rg || !rg->keyStorage || !rg->keyStorage->currentItem) {
1068
        unlockServer(server);
1069
        return UA_STATUSCODE_BADNOTFOUND;
1070
    }
1071
    UA_StatusCode ret =
1072
        UA_PubSubKeyStorage_activateKeyToChannelContext(psm, rg->head.identifier,
1073
                                                        rg->config.securityGroupId);
1074
    unlockServer(server);
1075
    return ret;
1076
}
1077
#endif
1078
1079
UA_StatusCode
1080
0
UA_Server_enableReaderGroup(UA_Server *server, const UA_NodeId readerGroupId){
1081
0
    if(!server)
1082
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1083
0
    lockServer(server);
1084
0
    UA_PubSubManager *psm = getPSM(server);
1085
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(psm, readerGroupId);
1086
0
    UA_StatusCode ret = (rg) ?
1087
0
        UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_OPERATIONAL) :
1088
0
        UA_STATUSCODE_BADNOTFOUND;
1089
0
    unlockServer(server);
1090
0
    return ret;
1091
0
}
1092
1093
UA_StatusCode
1094
0
UA_Server_disableReaderGroup(UA_Server *server, const UA_NodeId readerGroupId){
1095
0
    if(!server)
1096
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1097
0
    lockServer(server);
1098
0
    UA_PubSubManager *psm = getPSM(server);
1099
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(psm, readerGroupId);
1100
0
    UA_StatusCode ret = (rg) ?
1101
0
        UA_ReaderGroup_setPubSubState(psm, rg, UA_PUBSUBSTATE_DISABLED) :
1102
0
        UA_STATUSCODE_BADNOTFOUND;
1103
0
    unlockServer(server);
1104
0
    return ret;
1105
0
}
1106
1107
UA_StatusCode
1108
UA_Server_setReaderGroupEncryptionKeys(UA_Server *server,
1109
                                       const UA_NodeId readerGroup,
1110
                                       UA_UInt32 securityTokenId,
1111
                                       const UA_ByteString signingKey,
1112
                                       const UA_ByteString encryptingKey,
1113
0
                                       const UA_ByteString keyNonce) {
1114
0
    if(!server)
1115
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1116
0
    lockServer(server);
1117
0
    UA_PubSubManager *psm = getPSM(server);
1118
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(getPSM(server), readerGroup);
1119
0
    UA_StatusCode res = (rg) ?
1120
0
        UA_ReaderGroup_setEncryptionKeys(psm, rg, securityTokenId, signingKey,
1121
0
                                         encryptingKey, keyNonce) : UA_STATUSCODE_BADNOTFOUND;
1122
0
    unlockServer(server);
1123
0
    return res;
1124
0
}
1125
1126
UA_StatusCode
1127
UA_Server_updateReaderGroupConfig(UA_Server *server, const UA_NodeId rgId,
1128
0
                                  const UA_ReaderGroupConfig *config) {
1129
0
    if(!server || !config)
1130
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1131
1132
0
    lockServer(server);
1133
1134
0
    UA_PubSubManager *psm = getPSM(server);
1135
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(getPSM(server), rgId);
1136
0
    if(!rg) {
1137
0
        unlockServer(server);
1138
0
        return UA_STATUSCODE_BADNOTFOUND;
1139
0
    }
1140
1141
0
    if(UA_PubSubState_isEnabled(rg->head.state)) {
1142
0
        UA_LOG_ERROR_PUBSUB(psm->logging, rg,
1143
0
                            "The ReaderGroup must be disabled to update the config");
1144
0
        unlockServer(server);
1145
0
        return UA_STATUSCODE_BADINTERNALERROR;
1146
0
    }
1147
1148
    /* Store the old config */
1149
0
    UA_ReaderGroupConfig oldConfig = rg->config;
1150
1151
    /* Deep copy of the config */
1152
0
    UA_StatusCode retval = UA_ReaderGroupConfig_copy(config, &rg->config);
1153
0
    if(retval != UA_STATUSCODE_GOOD) {
1154
0
        unlockServer(server);
1155
0
        return retval;
1156
0
    }
1157
1158
    /* Validate the connection settings */
1159
0
    retval = UA_ReaderGroup_connect(psm, rg, true);
1160
0
    if(retval != UA_STATUSCODE_GOOD) {
1161
0
        UA_LOG_ERROR_PUBSUB(psm->logging, rg,
1162
0
                            "Could not validate the connection parameters");
1163
0
        goto errout;
1164
0
    }
1165
1166
#ifdef UA_ENABLE_PUBSUB_SKS
1167
    if(!UA_String_equal(&rg->config.securityGroupId, &oldConfig.securityGroupId) ||
1168
       rg->config.securityMode != oldConfig.securityMode) {
1169
        /* Detach keystorage and reattach if needed */
1170
        if(rg->keyStorage) {
1171
            UA_PubSubKeyStorage_detachKeyStorage(psm, rg->keyStorage);
1172
            rg->keyStorage = NULL;
1173
        }
1174
        if(rg->config.securityMode == UA_MESSAGESECURITYMODE_SIGN ||
1175
           rg->config.securityMode == UA_MESSAGESECURITYMODE_SIGNANDENCRYPT) {
1176
            retval = readerGroupAttachSKSKeystorage(psm, rg);
1177
            if(retval != UA_STATUSCODE_GOOD) {
1178
                UA_LOG_ERROR_PUBSUB(psm->logging, rg,
1179
                                    "Attaching the SKS KeyStorage failed");
1180
                goto errout;
1181
            }
1182
        }
1183
    }
1184
#endif
1185
1186
    /* Clean up and return */
1187
0
    UA_ReaderGroupConfig_clear(&oldConfig);
1188
0
    unlockServer(server);
1189
0
    return UA_STATUSCODE_GOOD;
1190
1191
0
 errout:
1192
0
    UA_ReaderGroupConfig_clear(&rg->config);
1193
0
    rg->config = oldConfig;
1194
0
    unlockServer(server);
1195
0
    return retval;
1196
0
}
1197
1198
#endif /* UA_ENABLE_PUBSUB */