Coverage Report

Created: 2025-08-29 06:10

/src/open62541/src/pubsub/ua_pubsub_internal.h
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 * Copyright (c) 2017-2019 Fraunhofer IOSB (Author: Andreas Ebner)
6
 * Copyright (c) 2019 Kalycito Infotech Private Limited
7
 * Copyright (c) 2020 Yannick Wallerer, Siemens AG
8
 * Copyright (c) 2020, 2022 Thomas Fischer, Siemens AG
9
 * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes)
10
 * Copyright (c) 2022 Siemens AG (Author: Thomas Fischer)
11
 * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf)
12
 * Copyright (c) 2022 Linutronix GmbH (Author: Muddasir Shakil)
13
 */
14
15
#ifndef UA_PUBSUB_INTERNAL_H_
16
#define UA_PUBSUB_INTERNAL_H_
17
18
#define UA_INTERNAL
19
#include <open62541/server.h>
20
#include <open62541/server_pubsub.h>
21
22
#include "mp_printf.h"
23
#include "ua_pubsub_networkmessage.h"
24
#include "../server/ua_server_internal.h"
25
26
/**
27
 * PubSub State Machine
28
 * --------------------
29
 * 
30
 * The following table described the behaviour of components expected during
31
 * state changes and also the integration which is expected between the
32
 * components.
33
 *
34
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
35
 * |**Component**   |       |**Disabled**        |**Paused**      |**Pre-Operational** |**Operational** |**Error**       |
36
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
37
 * |PubSubConnection|Trigger|Manual disable      |Connection      |Manual enable ||    |Pre-Operational |Unrecoverable   |
38
 * |                |       |                    |enabled but the |Recoverable abort of|&& Connected    |abort of the    |
39
 * |                |       |                    |server is not   |EventLoop connection|EventLoop       |EventLoop       |
40
 * |                |       |                    |running.        |                    |connection      |connection ||   |
41
 * |                |       |                    |                |                    |                |Internal Error  |
42
 * |                +-------+--------------------+----------------+--------------------+----------------+----------------+
43
 * |                |Action |The underlying      |                |Start the async     |                |Same as the     |
44
 * |                |       |connection is closed|                |opening of the      |                |Disabled case   |
45
 * |                |       |(async). Immediately|                |underlying EventLoop|                |                |
46
 * |                |       |set the EventLoop   |                |connection.         |                |                |
47
 * |                |       |connection context  |                |Automatically switch|                |                |
48
 * |                |       |pointer to NULL. So |                |to operational when |                |                |
49
 * |                |       |that the            |                |the EventLoop       |                |                |
50
 * |                |       |PubSubConnection can|                |connection is fully |                |                |
51
 * |                |       |be freed without    |                |open. This can only |                |                |
52
 * |                |       |waiting for the     |                |be signaled by the  |                |                |
53
 * |                |       |EventLoop connection|                |underlying EventLoop|                |                |
54
 * |                |       |to finish closing.  |                |connection in the   |                |                |
55
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
56
 * |WriterGroup     |Trigger|Manual disable      |WG is enabled &&|WG is enabled &&    |WG is enabled &&|Internal error  |
57
 * |                |       |                    |PubSubConnection|PubSubConnection    |PubSubConnection|                |
58
 * |                |       |                    |not enabled     |Pre-Operational     |Operational     |                |
59
 * |                +-------+--------------------+----------------+--------------------+----------------+----------------+
60
 * |                |Action |Publish callback    |Publish callback|Publish callback    |Publish callback|Publish callback|
61
 * |                |       |deregistered        |deregistered    |deregistered        |registered      |deregistered    |
62
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
63
 * |DataSetWriter   |Trigger|Manual disable      |DSW enabled &&  |DSW enabled && WG   |DSW enabled &&  |Internal error  |
64
 * |                |       |                    |WG is not       |Pre-Operational     |WG is           |                |
65
 * |                |       |                    |enabled         |                    |Operational     |                |
66
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
67
 * |ReaderGroup     |Trigger|Manual disable      |RG enabled &&   |RG enabled &&       |RG enabled &&   |Internal error  |
68
 * |                |       |                    |PubSubConnection|(PubSubConnection   |PubSubConnection|                |
69
 * |                |       |                    |not enabled     |Pre-Operational ||  |Operational &&  |                |
70
 * |                |       |                    |                |RG-connection not   |RG-connection   |                |
71
 * |                |       |                    |                |fully established)  |established     |                |
72
 * |                +-------+--------------------+----------------+--------------------+----------------+----------------+
73
 * |                |Action |RG connection       |RG connection   |RG connection       |RG connection   |RG connection   |
74
 * |                |       |disconnected        |disconnected    |connected           |connected       |disconnected    |
75
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
76
 * |DataSetReader   |Trigger|Manual disable      |DSR enabled &&  |DSR enabled && RG   |DSR enabled &&  |Internal error  |
77
 * |                |       |                    |RG not enabled  |Pre-Operational     |RG Operational  |                |
78
 * +----------------+-------+--------------------+----------------+--------------------+----------------+----------------+
79
 */
80
81
_UA_BEGIN_DECLS
82
83
#ifdef UA_ENABLE_PUBSUB
84
85
/* Max number of underlying sockets for sending and receiving for every
86
 * PubSubConnection. Note that a PubSubConnection may have WriterGroups with
87
 * dedicated sockets. Because for UDP unicast only the WriterGroup has the
88
 * target host information. */
89
0
#define UA_PUBSUB_MAXCHANNELS 8
90
91
0
#define UA_PUBSUB_PROFILES_SIZE 4
92
93
struct UA_WriterGroup;
94
typedef struct UA_WriterGroup UA_WriterGroup;
95
96
struct UA_ReaderGroup;
97
typedef struct UA_ReaderGroup UA_ReaderGroup;
98
99
struct UA_SecurityGroup;
100
typedef struct UA_SecurityGroup UA_SecurityGroup;
101
102
struct UA_DataSetReader;
103
typedef struct UA_DataSetReader UA_DataSetReader;
104
105
struct UA_PubSubManager;
106
typedef struct UA_PubSubManager UA_PubSubManager;
107
108
struct UA_PubSubKeyStorage;
109
typedef struct UA_PubSubKeyStorage UA_PubSubKeyStorage;
110
111
/* Get the matching ConnectionManager from the EventLoop */
112
UA_ConnectionManager *
113
getCM(UA_EventLoop *el, UA_String protocol);
114
115
const char *
116
UA_PubSubState_name(UA_PubSubState state);
117
118
/* A component is considered enabled if it is not in the DISABLED or ERROR
119
 * state. All other states (also PAUSED) can lead to automatic recovery into
120
 * OPERATIONAL. */
121
static UA_INLINE UA_Boolean
122
0
UA_PubSubState_isEnabled(UA_PubSubState state) {
123
0
    return (state != UA_PUBSUBSTATE_DISABLED &&
124
0
            state != UA_PUBSUBSTATE_ERROR);
125
0
}
Unexecuted instantiation: ua_pubsub_connection.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_dataset.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_writer.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_writergroup.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_reader.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_readergroup.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_manager.c:UA_PubSubState_isEnabled
Unexecuted instantiation: ua_pubsub_ns0.c:UA_PubSubState_isEnabled
126
127
/* All PubSubComponents share the same header structure */
128
129
typedef struct {
130
    UA_NodeId identifier;
131
    UA_PubSubComponentType componentType;
132
    UA_PubSubState state;
133
    UA_String logIdString; /* Precomputed logging prefix */
134
    UA_Boolean transientState; /* We are in the middle of a state update */
135
} UA_PubSubComponentHead;
136
137
#define UA_LOG_PUBSUB_INTERNAL(LOGGER, LEVEL, COMPONENT, MSG, ...)      \
138
    if(UA_LOGLEVEL <= UA_LOGLEVEL_##LEVEL) {                            \
139
        UA_LOG_##LEVEL(LOGGER, UA_LOGCATEGORY_PUBSUB, "%S" MSG "%.0s",  \
140
                       (COMPONENT)->head.logIdString, __VA_ARGS__);     \
141
    }
142
143
#define UA_LOG_TRACE_PUBSUB(LOGGER, COMPONENT, ...)                          \
144
0
    UA_MACRO_EXPAND(UA_LOG_PUBSUB_INTERNAL(LOGGER, TRACE, COMPONENT, __VA_ARGS__, ""))
145
#define UA_LOG_DEBUG_PUBSUB(LOGGER, COMPONENT, ...)                          \
146
0
    UA_MACRO_EXPAND(UA_LOG_PUBSUB_INTERNAL(LOGGER, DEBUG, COMPONENT, __VA_ARGS__, ""))
147
#define UA_LOG_INFO_PUBSUB(LOGGER, COMPONENT, ...)                           \
148
0
    UA_MACRO_EXPAND(UA_LOG_PUBSUB_INTERNAL(LOGGER, INFO, COMPONENT, __VA_ARGS__, ""))
149
#define UA_LOG_WARNING_PUBSUB(LOGGER, COMPONENT, ...)                        \
150
0
    UA_MACRO_EXPAND(UA_LOG_PUBSUB_INTERNAL(LOGGER, WARNING, COMPONENT, __VA_ARGS__, ""))
151
#define UA_LOG_ERROR_PUBSUB(LOGGER, COMPONENT, ...)                          \
152
0
    UA_MACRO_EXPAND(UA_LOG_PUBSUB_INTERNAL(LOGGER, ERROR, COMPONENT, __VA_ARGS__, ""))
153
#define UA_LOG_FATAL_PUBSUB(LOGGER, COMPONENT, ...)                          \
154
    UA_MACRO_EXPAND(UA_LOG_PUBSUB_INTERNAL(LOGGER, FATAL, COMPONENT, __VA_ARGS__, ""))
155
156
void
157
UA_PubSubComponentHead_clear(UA_PubSubComponentHead *psch);
158
159
/**********************************************/
160
/*            PublishedDataSet                */
161
/**********************************************/
162
163
typedef struct UA_PublishedDataSet {
164
    UA_PubSubComponentHead head;
165
    TAILQ_ENTRY(UA_PublishedDataSet) listEntry;
166
    TAILQ_HEAD(, UA_DataSetField) fields;
167
    UA_PublishedDataSetConfig config;
168
    UA_DataSetMetaDataType dataSetMetaData;
169
    UA_UInt16 fieldSize;
170
    UA_UInt16 promotedFieldsCount;
171
172
    /* The counter is required because the PDS has not state.
173
     * Check if it is actively used when changes are introduced. */
174
    UA_UInt16 configurationFreezeCounter;
175
} UA_PublishedDataSet;
176
177
UA_StatusCode
178
UA_PublishedDataSetConfig_copy(const UA_PublishedDataSetConfig *src,
179
                               UA_PublishedDataSetConfig *dst);
180
181
UA_PublishedDataSet *
182
UA_PublishedDataSet_find(UA_PubSubManager *psm, const UA_NodeId id);
183
184
UA_PublishedDataSet *
185
UA_PublishedDataSet_findByName(UA_PubSubManager *psm, const UA_String name);
186
187
UA_AddPublishedDataSetResult
188
UA_PublishedDataSet_create(UA_PubSubManager *psm,
189
                           const UA_PublishedDataSetConfig *publishedDataSetConfig,
190
                           UA_NodeId *pdsIdentifier);
191
192
UA_StatusCode
193
UA_PublishedDataSet_remove(UA_PubSubManager *psm, UA_PublishedDataSet *pds);
194
195
/*********************/
196
/* SubscribedDataSet */
197
/*********************/
198
199
typedef struct UA_SubscribedDataSet {
200
    UA_PubSubComponentHead head;
201
    TAILQ_ENTRY(UA_SubscribedDataSet) listEntry;
202
    UA_SubscribedDataSetConfig config;
203
    UA_DataSetReader *connectedReader;
204
} UA_SubscribedDataSet;
205
206
UA_StatusCode
207
UA_SubscribedDataSetConfig_copy(const UA_SubscribedDataSetConfig *src,
208
                                UA_SubscribedDataSetConfig *dst);
209
210
UA_SubscribedDataSet *
211
UA_SubscribedDataSet_find(UA_PubSubManager *psm, const UA_NodeId id);
212
213
UA_SubscribedDataSet *
214
UA_SubscribedDataSet_findByName(UA_PubSubManager *psm, const UA_String name);
215
216
void
217
UA_SubscribedDataSet_remove(UA_PubSubManager *psm, UA_SubscribedDataSet *sds);
218
219
/**********************************************/
220
/*               Connection                   */
221
/**********************************************/
222
223
typedef struct UA_PubSubConnection {
224
    UA_PubSubComponentHead head;
225
    TAILQ_ENTRY(UA_PubSubConnection) listEntry;
226
227
    /* The send/recv connections are only opened if the state is operational */
228
    UA_PubSubConnectionConfig config;
229
    UA_Boolean json; /* Extracted from the TransportProfileUrl */
230
231
    /* Channels belonging to the PubSubConnection. Send channels belong to
232
     * WriterGroups, recv channels belong to ReaderGroups. We only open channels 
233
     * if there is at least one WriterGroup/ReaderGroup respectively.
234
     *
235
     * Some channels belong exclusively to just one WriterGroup/ReaderGroup that
236
     * defines additional connection properties. For example an MQTT topic name
237
     * or QoS parameters. In that case a dedicated NetworkCallback is used that
238
     * takes this ReaderGroup/WriterGroup directly as context. */
239
    UA_ConnectionManager *cm;
240
    uintptr_t recvChannels[UA_PUBSUB_MAXCHANNELS];
241
    size_t recvChannelsSize;
242
    uintptr_t sendChannel;
243
244
    size_t writerGroupsSize;
245
    LIST_HEAD(, UA_WriterGroup) writerGroups;
246
247
    size_t readerGroupsSize;
248
    LIST_HEAD(, UA_ReaderGroup) readerGroups;
249
250
    UA_DateTime silenceErrorUntil; /* Avoid generating too many logs */
251
252
    UA_Boolean deleteFlag; /* To be deleted - in addition to the PubSubState */
253
    UA_DelayedCallback dc; /* For delayed freeing */
254
} UA_PubSubConnection;
255
256
UA_PubSubConnection *
257
UA_PubSubConnection_find(UA_PubSubManager *psm, const UA_NodeId id);
258
259
UA_StatusCode
260
UA_PubSubConnection_create(UA_PubSubManager *psm,
261
                           const UA_PubSubConnectionConfig *connectionConfig,
262
                           UA_NodeId *connectionIdentifier);
263
264
UA_StatusCode
265
UA_PubSubConnection_delete(UA_PubSubManager *psm, UA_PubSubConnection *c);
266
267
UA_StatusCode
268
UA_PubSubConnection_setPubSubState(UA_PubSubManager *psm, UA_PubSubConnection *c,
269
                                   UA_PubSubState targetState);
270
271
/**********************************************/
272
/*              DataSetWriter                 */
273
/**********************************************/
274
275
typedef struct UA_DataSetWriterSample {
276
    UA_Boolean valueChanged;
277
    UA_DataValue value;
278
} UA_DataSetWriterSample;
279
280
typedef struct UA_DataSetWriter {
281
    UA_PubSubComponentHead head;
282
    LIST_ENTRY(UA_DataSetWriter) listEntry;
283
284
    UA_DataSetWriterConfig config;
285
    UA_WriterGroup *linkedWriterGroup;
286
    UA_PublishedDataSet *connectedDataSet;
287
    UA_ConfigurationVersionDataType connectedDataSetVersion;
288
289
    /* Deltaframes */
290
    UA_UInt16 deltaFrameCounter; /* count of sent deltaFrames */
291
    size_t lastSamplesCount;
292
    UA_DataSetWriterSample *lastSamples;
293
294
    UA_UInt16 actualDataSetMessageSequenceCount;
295
    UA_Boolean configurationFrozen;
296
    UA_UInt64 pubSubStateTimerId;
297
} UA_DataSetWriter;
298
299
UA_StatusCode
300
UA_DataSetWriterConfig_copy(const UA_DataSetWriterConfig *src,
301
                            UA_DataSetWriterConfig *dst);
302
303
UA_DataSetWriter *
304
UA_DataSetWriter_find(UA_PubSubManager *psm, const UA_NodeId id);
305
306
UA_StatusCode
307
UA_DataSetWriter_setPubSubState(UA_PubSubManager *psm, UA_DataSetWriter *dsw,
308
                                UA_PubSubState targetState);
309
310
UA_StatusCode
311
UA_DataSetWriter_generateDataSetMessage(UA_PubSubManager *psm,
312
                                        UA_DataSetWriter *dsw,
313
                                        UA_DataSetMessage *dsm);
314
315
UA_StatusCode
316
UA_DataSetWriter_create(UA_PubSubManager *psm,
317
                        const UA_NodeId writerGroup, const UA_NodeId dataSet,
318
                        const UA_DataSetWriterConfig *dataSetWriterConfig,
319
                        UA_NodeId *writerIdentifier);
320
321
UA_StatusCode
322
UA_DataSetWriter_remove(UA_PubSubManager *psm, UA_DataSetWriter *dsw);
323
324
/**********************************************/
325
/*               WriterGroup                  */
326
/**********************************************/
327
328
struct UA_WriterGroup {
329
    UA_PubSubComponentHead head;
330
    LIST_ENTRY(UA_WriterGroup) listEntry;
331
332
    UA_WriterGroupConfig config;
333
334
    LIST_HEAD(, UA_DataSetWriter) writers;
335
    UA_UInt32 writersCount;
336
337
    UA_UInt64 publishCallbackId; /* registered if != 0 */
338
    UA_UInt16 sequenceNumber; /* Increased after every sent message */
339
    UA_DateTime lastPublishTimeStamp;
340
341
    /* The ConnectionManager pointer is stored in the Connection. The channels
342
     * are either stored here or in the Connection, but never both. */
343
    UA_PubSubConnection *linkedConnection;
344
    uintptr_t sendChannel;
345
    UA_Boolean deleteFlag;
346
347
    UA_UInt32 securityTokenId;
348
    UA_UInt32 nonceSequenceNumber; /* To be part of the MessageNonce */
349
    void *securityPolicyContext;
350
#ifdef UA_ENABLE_PUBSUB_SKS
351
    UA_PubSubKeyStorage *keyStorage; /* non-owning pointer to keyStorage*/
352
#endif
353
};
354
355
UA_StatusCode
356
UA_WriterGroup_create(UA_PubSubManager *psm, const UA_NodeId connection,
357
                      const UA_WriterGroupConfig *writerGroupConfig,
358
                      UA_NodeId *writerGroupIdentifier);
359
360
UA_StatusCode
361
UA_WriterGroup_remove(UA_PubSubManager *psm, UA_WriterGroup *wg);
362
363
/* Exposed so we can change the publish interval without having to stop */
364
UA_StatusCode
365
UA_WriterGroup_addPublishCallback(UA_PubSubManager *psm, UA_WriterGroup *wg);
366
367
void
368
UA_WriterGroup_removePublishCallback(UA_PubSubManager *psm, UA_WriterGroup *wg);
369
370
UA_StatusCode
371
UA_WriterGroup_setEncryptionKeys(UA_PubSubManager *psm, UA_WriterGroup *wg,
372
                                 UA_UInt32 securityTokenId,
373
                                 const UA_ByteString signingKey,
374
                                 const UA_ByteString encryptingKey,
375
                                 const UA_ByteString keyNonce);
376
377
UA_StatusCode
378
UA_WriterGroupConfig_copy(const UA_WriterGroupConfig *src,
379
                          UA_WriterGroupConfig *dst);
380
381
UA_WriterGroup *
382
UA_WriterGroup_find(UA_PubSubManager *psm, const UA_NodeId id);
383
384
UA_StatusCode
385
UA_WriterGroup_setPubSubState(UA_PubSubManager *psm, UA_WriterGroup *wg,
386
                              UA_PubSubState targetState);
387
388
void
389
UA_WriterGroup_publishCallback(UA_PubSubManager *psm, UA_WriterGroup *wg);
390
391
/**********************************************/
392
/*               DataSetField                 */
393
/**********************************************/
394
395
typedef struct UA_DataSetField {
396
    UA_DataSetFieldConfig config;
397
    TAILQ_ENTRY(UA_DataSetField) listEntry;
398
    UA_NodeId identifier;
399
    UA_NodeId publishedDataSet;     /* parent pds */
400
    UA_FieldMetaData fieldMetaData; /* contains the dataSetFieldId */
401
    UA_UInt64 sampleCallbackId;
402
    UA_Boolean sampleCallbackIsRegistered;
403
} UA_DataSetField;
404
405
UA_StatusCode
406
UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src,
407
                           UA_DataSetFieldConfig *dst);
408
409
UA_DataSetField *
410
UA_DataSetField_find(UA_PubSubManager *psm, const UA_NodeId id);
411
412
UA_DataSetFieldResult
413
UA_DataSetField_remove(UA_PubSubManager *psm, UA_DataSetField *currentField);
414
415
UA_DataSetFieldResult
416
UA_DataSetField_create(UA_PubSubManager *psm, const UA_NodeId publishedDataSet,
417
                       const UA_DataSetFieldConfig *fieldConfig,
418
                       UA_NodeId *fieldIdentifier);
419
420
void
421
UA_PubSubDataSetField_sampleValue(UA_PubSubManager *psm,
422
                                  UA_DataSetField *field,
423
                                  UA_DataValue *value);
424
425
/**********************************************/
426
/*               DataSetReader                */
427
/**********************************************/
428
429
struct UA_DataSetReader {
430
    UA_PubSubComponentHead head;
431
    LIST_ENTRY(UA_DataSetReader) listEntry;
432
433
    UA_DataSetReaderConfig config;
434
    UA_ReaderGroup *linkedReaderGroup;
435
436
    /* MessageReceiveTimeout handling */
437
    UA_UInt64 msgRcvTimeoutTimerId;
438
};
439
440
UA_DataSetReader *
441
UA_DataSetReader_find(UA_PubSubManager *psm, const UA_NodeId id);
442
443
/* Process Network Message using DataSetReader */
444
void
445
UA_DataSetReader_process(UA_PubSubManager *psm,
446
                         UA_DataSetReader *dataSetReader,
447
                         UA_DataSetMessage *dataSetMsg);
448
449
UA_StatusCode
450
UA_DataSetReader_generateDataSetMessage(UA_Server *server,
451
                                        UA_DataSetMessage *dsm,
452
                                        UA_DataSetReader *dsr);
453
454
UA_StatusCode
455
UA_DataSetReader_checkIdentifier(UA_PubSubManager *psm, UA_DataSetReader *dsr,
456
                                 UA_NetworkMessage *msg);
457
458
UA_StatusCode
459
UA_DataSetReader_create(UA_PubSubManager *psm, UA_NodeId readerGroupIdentifier,
460
                        const UA_DataSetReaderConfig *dataSetReaderConfig,
461
                        UA_NodeId *readerIdentifier);
462
463
UA_StatusCode
464
UA_DataSetReader_remove(UA_PubSubManager *psm, UA_DataSetReader *dsr);
465
466
UA_StatusCode
467
DataSetReader_createTargetVariables(UA_PubSubManager *psm, UA_DataSetReader *dsr,
468
                                    size_t targetsSize, const UA_FieldTargetDataType *targets);
469
470
/* Returns an error reason if the target state is `Error` */
471
void
472
UA_DataSetReader_setPubSubState(UA_PubSubManager *psm, UA_DataSetReader *dsr,
473
                                UA_PubSubState targetState, UA_StatusCode errorReason);
474
475
/**********************************************/
476
/*                ReaderGroup                 */
477
/**********************************************/
478
479
struct UA_ReaderGroup {
480
    UA_PubSubComponentHead head;
481
    LIST_ENTRY(UA_ReaderGroup) listEntry;
482
483
    UA_ReaderGroupConfig config;
484
485
    LIST_HEAD(, UA_DataSetReader) readers;
486
    UA_UInt32 readersCount;
487
488
    UA_Boolean hasReceived; /* Received a message since the last _connect */
489
490
    /* The ConnectionManager pointer is stored in the Connection. The channels 
491
     * are either stored here or in the Connection, but never both. */
492
    UA_PubSubConnection *linkedConnection;
493
    uintptr_t recvChannels[UA_PUBSUB_MAXCHANNELS];
494
    size_t recvChannelsSize;
495
    UA_Boolean deleteFlag;
496
497
    UA_UInt32 securityTokenId;
498
    UA_UInt32 nonceSequenceNumber; /* To be part of the MessageNonce */
499
    void *securityPolicyContext;
500
#ifdef UA_ENABLE_PUBSUB_SKS
501
    UA_PubSubKeyStorage *keyStorage;
502
#endif
503
};
504
505
UA_StatusCode
506
UA_ReaderGroup_create(UA_PubSubManager *psm, UA_NodeId connectionId,
507
                      const UA_ReaderGroupConfig *rgc,
508
                      UA_NodeId *readerGroupId);
509
510
UA_StatusCode
511
UA_ReaderGroup_remove(UA_PubSubManager *psm, UA_ReaderGroup *rg);
512
513
UA_StatusCode
514
UA_ReaderGroup_connect(UA_PubSubManager *psm, UA_ReaderGroup *rg,
515
                       UA_Boolean validate);
516
517
UA_Boolean
518
UA_ReaderGroup_canConnect(UA_ReaderGroup *rg);
519
520
void
521
UA_ReaderGroup_disconnect(UA_ReaderGroup *rg);
522
523
UA_StatusCode
524
UA_ReaderGroup_setEncryptionKeys(UA_PubSubManager *psm, UA_ReaderGroup *rg,
525
                                 UA_UInt32 securityTokenId,
526
                                 const UA_ByteString signingKey,
527
                                 const UA_ByteString encryptingKey,
528
                                 const UA_ByteString keyNonce);
529
530
UA_StatusCode
531
UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
532
                          UA_ReaderGroupConfig *dst);
533
534
UA_ReaderGroup *
535
UA_ReaderGroup_find(UA_PubSubManager *psm, const UA_NodeId id);
536
537
UA_StatusCode
538
UA_ReaderGroup_setPubSubState(UA_PubSubManager *psm, UA_ReaderGroup *rg,
539
                              UA_PubSubState targetState);
540
541
UA_Boolean
542
UA_ReaderGroup_process(UA_PubSubManager *psm, UA_ReaderGroup *rg,
543
                       UA_NetworkMessage *nm);
544
545
/* The buffer is the entire message. The ctx->pos points after the decoded
546
 * header. The ctx->end is modified to remove padding, etc. */
547
UA_StatusCode
548
verifyAndDecryptNetworkMessage(const UA_Logger *logger, UA_ByteString buffer,
549
                               Ctx *ctx, UA_NetworkMessage *nm,
550
                               UA_ReaderGroup *rg);
551
552
UA_StatusCode
553
UA_ReaderGroup_decodeNetworkMessage(UA_PubSubManager *psm,
554
                                    UA_ReaderGroup *rg,
555
                                    UA_ByteString buffer,
556
                                    UA_NetworkMessage *nm);
557
558
#ifdef UA_ENABLE_JSON_ENCODING
559
UA_StatusCode
560
UA_ReaderGroup_decodeNetworkMessageJSON(UA_PubSubManager *psm,
561
                                        UA_ReaderGroup *rg,
562
                                        UA_ByteString buffer,
563
                                        UA_NetworkMessage *nm);
564
#endif
565
566
#ifdef UA_ENABLE_PUBSUB_SKS
567
568
/*********************************************************/
569
/*                    SecurityGroup                      */
570
/*********************************************************/
571
572
struct UA_SecurityGroup {
573
    UA_String securityGroupId;
574
    UA_SecurityGroupConfig config;
575
    UA_PubSubKeyStorage *keyStorage;
576
    UA_NodeId securityGroupNodeId;
577
    UA_UInt64 callbackId;
578
    UA_DateTime baseTime;
579
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
580
    UA_NodeId securityGroupFolderId;
581
#endif
582
    TAILQ_ENTRY(UA_SecurityGroup) listEntry;
583
};
584
585
UA_StatusCode
586
UA_SecurityGroupConfig_copy(const UA_SecurityGroupConfig *src,
587
                            UA_SecurityGroupConfig *dst);
588
589
UA_SecurityGroup *
590
UA_SecurityGroup_findByName(UA_PubSubManager *psm, const UA_String name);
591
592
UA_SecurityGroup *
593
UA_SecurityGroup_find(UA_PubSubManager *psm, const UA_NodeId id);
594
595
void
596
UA_SecurityGroup_remove(UA_PubSubManager *psm, UA_SecurityGroup *sg);
597
598
#endif /* UA_ENABLE_PUBSUB_SKS */
599
600
/******************/
601
/* PubSub Manager */
602
/******************/
603
604
typedef enum {
605
    UA_WRITER_GROUP = 0,
606
    UA_DATA_SET_WRITER = 1,
607
} UA_ReserveIdType;
608
609
typedef struct UA_ReserveId {
610
    UA_UInt16 id;
611
    UA_ReserveIdType reserveIdType;
612
    UA_String transportProfileUri;
613
    UA_NodeId sessionId;
614
    ZIP_ENTRY(UA_ReserveId) treeEntry;
615
} UA_ReserveId;
616
617
typedef ZIP_HEAD(UA_ReserveIdTree, UA_ReserveId) UA_ReserveIdTree;
618
619
struct UA_PubSubManager {
620
    UA_ServerComponent sc;
621
622
    UA_Logger *logging; /* shortcut to sc->server.logging */
623
624
    UA_UInt64 defaultPublisherId;
625
    /* Connections and PublishedDataSets can exist alone (own lifecycle) -> top
626
     * level components */
627
    size_t connectionsSize;
628
    TAILQ_HEAD(, UA_PubSubConnection) connections;
629
630
    size_t publishedDataSetsSize;
631
    TAILQ_HEAD(, UA_PublishedDataSet) publishedDataSets;
632
633
    size_t subscribedDataSetsSize;
634
    TAILQ_HEAD(, UA_SubscribedDataSet) subscribedDataSets;
635
636
    size_t reserveIdsSize;
637
    UA_ReserveIdTree reserveIds;
638
639
#ifdef UA_ENABLE_PUBSUB_SKS
640
    LIST_HEAD(, UA_PubSubKeyStorage) pubSubKeyList;
641
642
    size_t securityGroupsSize;
643
    TAILQ_HEAD(, UA_SecurityGroup) securityGroups;
644
#endif
645
646
#ifndef UA_ENABLE_PUBSUB_INFORMATIONMODEL
647
    UA_UInt32 uniqueIdCount;
648
#endif
649
};
650
651
static UA_INLINE UA_PubSubManager *
652
0
getPSM(UA_Server *server) {
653
0
    return (UA_PubSubManager*)getServerComponentByName(server, UA_STRING("pubsub"));
654
0
}
Unexecuted instantiation: ua_pubsub_connection.c:getPSM
Unexecuted instantiation: ua_pubsub_dataset.c:getPSM
Unexecuted instantiation: ua_pubsub_writer.c:getPSM
Unexecuted instantiation: ua_pubsub_writergroup.c:getPSM
Unexecuted instantiation: ua_pubsub_reader.c:getPSM
Unexecuted instantiation: ua_pubsub_readergroup.c:getPSM
Unexecuted instantiation: ua_pubsub_manager.c:getPSM
Unexecuted instantiation: ua_pubsub_ns0.c:getPSM
655
656
UA_StatusCode
657
UA_PubSubManager_clear(UA_PubSubManager *psm);
658
659
void
660
UA_PubSubManager_setState(UA_PubSubManager *psm,
661
                          UA_LifecycleState state);
662
663
UA_StatusCode
664
UA_PubSubManager_reserveIds(UA_PubSubManager *psm, UA_NodeId sessionId,
665
                            UA_UInt16 numRegWriterGroupIds,
666
                            UA_UInt16 numRegDataSetWriterIds,
667
                            UA_String transportProfileUri, UA_UInt16 **writerGroupIds,
668
                            UA_UInt16 **dataSetWriterIds);
669
670
void
671
UA_PubSubManager_freeIds(UA_PubSubManager *psm);
672
673
#ifndef UA_ENABLE_PUBSUB_INFORMATIONMODEL
674
void
675
UA_PubSubManager_generateUniqueNodeId(UA_PubSubManager *psm, UA_NodeId *nodeId);
676
#endif
677
678
UA_Guid
679
UA_PubSubManager_generateUniqueGuid(UA_PubSubManager *psm);
680
681
UA_UInt32
682
UA_PubSubConfigurationVersionTimeDifference(UA_DateTime now);
683
684
/************************************/
685
/* Information Model Representation */
686
/************************************/
687
688
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL /* conditional compilation */
689
690
UA_StatusCode
691
initPubSubNS0(UA_Server *server);
692
693
UA_StatusCode
694
addPubSubConnectionRepresentation(UA_Server *server, UA_PubSubConnection *connection);
695
696
UA_StatusCode
697
addWriterGroupRepresentation(UA_Server *server, UA_WriterGroup *writerGroup);
698
699
UA_StatusCode
700
addReaderGroupRepresentation(UA_Server *server, UA_ReaderGroup *readerGroup);
701
702
UA_StatusCode
703
addDataSetWriterRepresentation(UA_Server *server, UA_DataSetWriter *dataSetWriter);
704
705
UA_StatusCode
706
addPublishedDataItemsRepresentation(UA_Server *server, UA_PublishedDataSet *publishedDataSet);
707
708
UA_StatusCode
709
addSubscribedDataSetRepresentation(UA_Server *server, UA_SubscribedDataSet *subscribedDataSet);
710
711
UA_StatusCode
712
addDataSetReaderRepresentation(UA_Server *server, UA_DataSetReader *dataSetReader);
713
714
UA_StatusCode
715
connectDataSetReaderToDataSet(UA_Server *server, UA_NodeId dsrId, UA_NodeId sdsId);
716
717
void
718
disconnectDataSetReaderToDataSet(UA_Server *server, UA_NodeId dsrId);
719
720
#ifdef UA_ENABLE_PUBSUB_SKS
721
UA_StatusCode
722
addSecurityGroupRepresentation(UA_Server *server, UA_SecurityGroup *securityGroup);
723
#endif /* UA_ENABLE_PUBSUB_SKS */
724
725
#endif /* UA_ENABLE_PUBSUB_INFORMATIONMODEL */
726
727
#endif /* UA_ENABLE_PUBSUB */
728
729
_UA_END_DECLS
730
731
#endif /* UA_PUBSUB_INTERNAL_H_ */