Coverage Report

Created: 2025-10-13 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/open62541/src/pubsub/ua_pubsub_reader.c
Line
Count
Source
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
 *
5
 * Copyright (c) 2017-2025 Fraunhofer IOSB (Author: Andreas Ebner)
6
 * Copyright (c) 2019 Fraunhofer IOSB (Author: Julius Pfrommer)
7
 * Copyright (c) 2019 Kalycito Infotech Private Limited
8
 * Copyright (c) 2021 Fraunhofer IOSB (Author: Jan Hermes)
9
 * Copyright (c) 2022 Siemens AG (Author: Thomas Fischer)
10
 * Copyright (c) 2022 Fraunhofer IOSB (Author: Noel Graf)
11
 */
12
13
#include "ua_pubsub_internal.h"
14
15
#ifdef UA_ENABLE_PUBSUB /* conditional compilation */
16
17
#include "ua_pubsub_networkmessage.h"
18
19
static UA_Boolean
20
0
publisherIdIsMatching(UA_NetworkMessage *msg, UA_PublisherId *idB) {
21
0
    if(!msg->publisherIdEnabled)
22
0
        return true;
23
0
    UA_PublisherId *idA = &msg->publisherId;
24
0
    if(idA->idType != idB->idType)
25
0
        return false;
26
0
    switch(idA->idType) {
27
0
        case UA_PUBLISHERIDTYPE_BYTE:   return idA->id.byte == idB->id.byte;
28
0
        case UA_PUBLISHERIDTYPE_UINT16: return idA->id.uint16 == idB->id.uint16;
29
0
        case UA_PUBLISHERIDTYPE_UINT32: return idA->id.uint32 == idB->id.uint32;
30
0
        case UA_PUBLISHERIDTYPE_UINT64: return idA->id.uint64 == idB->id.uint64;
31
0
        case UA_PUBLISHERIDTYPE_STRING: return UA_String_equal(&idA->id.string, &idB->id.string);
32
0
        default: break;
33
0
    }
34
0
    return false;
35
0
}
36
37
#if UA_LOGLEVEL <= 200
38
static void
39
printPublisherId(char *out, size_t size, UA_PublisherId *id) {
40
    switch(id->idType) {
41
    case UA_PUBLISHERIDTYPE_BYTE:   mp_snprintf(out, size, "(b)%u", (unsigned)id->id.byte); break;
42
    case UA_PUBLISHERIDTYPE_UINT16: mp_snprintf(out, size, "(u16)%u", (unsigned)id->id.uint16); break;
43
    case UA_PUBLISHERIDTYPE_UINT32: mp_snprintf(out, size, "(u32)%u", (unsigned)id->id.uint32); break;
44
    case UA_PUBLISHERIDTYPE_UINT64: mp_snprintf(out, size, "(u64)%lu", id->id.uint64); break;
45
    case UA_PUBLISHERIDTYPE_STRING: mp_snprintf(out, size, "\"%S\"", id->id.string); break;
46
    default: out[0] = 0; break;
47
    }
48
}
49
#endif
50
51
UA_StatusCode
52
UA_DataSetReader_checkIdentifier(UA_PubSubManager *psm, UA_DataSetReader *dsr,
53
0
                                 UA_NetworkMessage *msg) {
54
0
    if(!publisherIdIsMatching(msg, &dsr->config.publisherId)) {
55
#if UA_LOGLEVEL <= 200
56
        char idAStr[512];
57
        char idBStr[512];
58
        printPublisherId(idAStr, 512, &dsr->config.publisherId);
59
        printPublisherId(idBStr, 512, &msg->publisherId);
60
        UA_LOG_DEBUG_PUBSUB(psm->logging, dsr, "PublisherId does not match. "
61
                            "Expected %s, received %s", idAStr, idBStr);
62
#endif
63
0
        return UA_STATUSCODE_BADNOTFOUND;
64
0
    }
65
66
0
    UA_ReaderGroup *rg = dsr->linkedReaderGroup;
67
0
    if(rg->config.encodingMimeType == UA_PUBSUB_ENCODING_JSON) {
68
        // TODO
69
        /* if(dsr->config.dataSetWriterId == */
70
        /*    *msg->payloadHeader.dataSetPayloadHeader.dataSetWriterIds) { */
71
        /*     return UA_STATUSCODE_GOOD; */
72
        /* } */
73
74
        /* UA_LOG_DEBUG_PUBSUB(psm->logging, dsr, "DataSetWriterId does not match. " */
75
        /*                     "Expected %u, received %u", dsr->config.dataSetWriterId, */
76
        /*                     *msg->payloadHeader.dataSetPayloadHeader.dataSetWriterIds); */
77
0
        return UA_STATUSCODE_BADNOTFOUND;
78
0
    }
79
80
0
    if(msg->groupHeaderEnabled && msg->groupHeader.writerGroupIdEnabled) {
81
0
        if(dsr->config.writerGroupId != msg->groupHeader.writerGroupId) {
82
0
            UA_LOG_DEBUG_PUBSUB(psm->logging, dsr, "WriterGroupId does not match. "
83
0
                                "Expected %u, received %u", dsr->config.writerGroupId,
84
0
                                msg->groupHeader.writerGroupId);
85
0
            return UA_STATUSCODE_BADNOTFOUND;
86
0
        }
87
0
    }
88
89
0
    if(msg->payloadHeaderEnabled) {
90
0
        for(size_t i = 0; i < msg->messageCount; i++) {
91
0
            if(dsr->config.dataSetWriterId == msg->dataSetWriterIds[i])
92
0
                return UA_STATUSCODE_GOOD;
93
0
        }
94
0
        UA_LOG_DEBUG_PUBSUB(psm->logging, dsr,
95
0
                            "DataSetWriterIds in the payload do not match");
96
0
        return UA_STATUSCODE_BADNOTFOUND;
97
0
    }
98
99
0
    return UA_STATUSCODE_GOOD;
100
0
}
101
102
UA_DataSetReader *
103
0
UA_DataSetReader_find(UA_PubSubManager *psm, const UA_NodeId id) {
104
0
    if(!psm)
105
0
        return NULL;
106
0
    UA_PubSubConnection *psc;
107
0
    TAILQ_FOREACH(psc, &psm->connections, listEntry) {
108
0
        UA_ReaderGroup *rg;
109
0
        LIST_FOREACH(rg, &psc->readerGroups, listEntry) {
110
0
            UA_DataSetReader *dsr;
111
0
            LIST_FOREACH(dsr, &rg->readers, listEntry) {
112
0
                if(UA_NodeId_equal(&id, &dsr->head.identifier))
113
0
                    return dsr;
114
0
            }
115
0
        }
116
0
    }
117
0
    return NULL;
118
0
}
119
120
static UA_StatusCode
121
0
validateDSRConfig(UA_PubSubManager *psm, UA_DataSetReader *dsr) {
122
    /* Check if used dataSet metaData is valid in context of the rest of the config */
123
0
    if(dsr->config.dataSetFieldContentMask & UA_DATASETFIELDCONTENTMASK_RAWDATA) {
124
0
        for(size_t i = 0; i < dsr->config.dataSetMetaData.fieldsSize; i++) {
125
0
            const UA_FieldMetaData *field = &dsr->config.dataSetMetaData.fields[i];
126
0
            if((field->builtInType == UA_NS0ID_STRING || field->builtInType == UA_NS0ID_BYTESTRING) &&
127
0
               field->maxStringLength == 0) {
128
                /* Fields of type String or ByteString need to have defined
129
                 * MaxStringLength*/
130
0
                UA_LOG_ERROR_PUBSUB(psm->logging, dsr,
131
0
                                    "Add DataSetReader failed. MaxStringLength must be "
132
0
                                    "set in MetaData when using RawData field encoding.");
133
0
                return UA_STATUSCODE_BADCONFIGURATIONERROR;
134
0
            }
135
0
        }
136
0
    }
137
0
    return UA_STATUSCODE_GOOD;
138
0
}
139
140
static void
141
0
disconnectDSR2Standalone(UA_PubSubManager *psm, UA_DataSetReader *dsr) {
142
    /* Check if a sds name is defined */
143
0
    const UA_String sdsName = dsr->config.linkedStandaloneSubscribedDataSetName;
144
0
    if(UA_String_isEmpty(&sdsName))
145
0
        return;
146
147
0
    UA_SubscribedDataSet *sds = UA_SubscribedDataSet_findByName(psm, sdsName);
148
0
    if(!sds)
149
0
        return;
150
151
    /* Remove the backpointer from the sds */
152
0
    sds->connectedReader = NULL;
153
154
    /* Remove the references in the information model */
155
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
156
0
    disconnectDataSetReaderToDataSet(psm->sc.server, dsr->head.identifier);
157
0
#endif
158
0
}
159
160
/* Connect to StandaloneSubscribedDataSet if a name is defined */
161
static UA_StatusCode
162
0
connectDSR2Standalone(UA_PubSubManager *psm, UA_DataSetReader *dsr) {
163
    /* Check if a sds name is defined */
164
0
    const UA_String sdsName = dsr->config.linkedStandaloneSubscribedDataSetName;
165
0
    if(UA_String_isEmpty(&sdsName))
166
0
        return UA_STATUSCODE_GOOD;
167
168
0
    UA_SubscribedDataSet *sds = UA_SubscribedDataSet_findByName(psm, sdsName);
169
0
    if(!sds)
170
0
        return UA_STATUSCODE_BADNOTFOUND;
171
172
    /* Already connected? */
173
0
    if(sds->connectedReader) {
174
0
        if(sds->connectedReader != dsr)
175
0
            UA_LOG_ERROR_PUBSUB(psm->logging, dsr,
176
0
                                "Configured StandaloneSubscribedDataSet already "
177
0
                                "connected to a different DataSetReader");
178
0
        return UA_STATUSCODE_BADINTERNALERROR;
179
0
    }
180
181
    /* Check supported type */
182
0
    if(sds->config.subscribedDataSetType != UA_PUBSUB_SDS_TARGET) {
183
0
        UA_LOG_ERROR_PUBSUB(psm->logging, dsr,
184
0
                            "Not implemented! Currently only SubscribedDataSet as "
185
0
                            "TargetVariables is implemented");
186
0
        return UA_STATUSCODE_BADNOTIMPLEMENTED;
187
0
    }
188
189
0
    UA_LOG_DEBUG_PUBSUB(psm->logging, dsr, "Connecting SubscribedDataSet");
190
191
    /* Copy the metadata from the sds */
192
0
    UA_DataSetMetaDataType metaData;
193
0
    UA_StatusCode res = UA_DataSetMetaDataType_copy(&sds->config.dataSetMetaData, &metaData);
194
0
    if(res != UA_STATUSCODE_GOOD)
195
0
        return res;
196
197
    /* Prepare the input for _createTargetVariables and call it */
198
0
    UA_TargetVariablesDataType *tvs = &sds->config.subscribedDataSet.target;
199
0
    res = DataSetReader_createTargetVariables(psm, dsr, tvs->targetVariablesSize,
200
0
                                              tvs->targetVariables);
201
0
    if(res != UA_STATUSCODE_GOOD) {
202
0
        UA_DataSetMetaDataType_clear(&metaData);
203
0
        return res;
204
0
    }
205
206
    /* Use the metadata from the sds */
207
0
    UA_DataSetMetaDataType_clear(&dsr->config.dataSetMetaData);
208
0
    dsr->config.dataSetMetaData = metaData;
209
210
    /* Set the backpointer from the sds */
211
0
    sds->connectedReader = dsr;
212
213
    /* Make the connection visible in the information model */
214
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
215
0
    return connectDataSetReaderToDataSet(psm->sc.server, dsr->head.identifier,
216
0
                                         sds->head.identifier);
217
#else
218
    return UA_STATUSCODE_GOOD;
219
#endif
220
0
}
221
222
UA_StatusCode
223
UA_DataSetReader_create(UA_PubSubManager *psm, UA_NodeId readerGroupIdentifier,
224
                        const UA_DataSetReaderConfig *dataSetReaderConfig,
225
0
                        UA_NodeId *readerIdentifier) {
226
0
    if(!psm || !dataSetReaderConfig)
227
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
228
229
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
230
231
    /* Search the reader group by the given readerGroupIdentifier */
232
0
    UA_ReaderGroup *rg = UA_ReaderGroup_find(psm, readerGroupIdentifier);
233
0
    if(!rg)
234
0
        return UA_STATUSCODE_BADNOTFOUND;
235
236
0
    if(UA_PubSubState_isEnabled(rg->head.state)) {
237
0
        UA_LOG_WARNING_PUBSUB(psm->logging, rg,
238
0
                              "Cannot add a DataSetReader while the "
239
0
                              "ReaderGroup is enabled");
240
0
        return UA_STATUSCODE_BADCONFIGURATIONERROR;
241
0
    }
242
243
    /* Allocate memory for new DataSetReader */
244
0
    UA_DataSetReader *dsr = (UA_DataSetReader *)
245
0
        UA_calloc(1, sizeof(UA_DataSetReader));
246
0
    if(!dsr)
247
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
248
249
0
    dsr->head.componentType = UA_PUBSUBCOMPONENT_DATASETREADER;
250
0
    dsr->linkedReaderGroup = rg;
251
252
    /* Add the new reader to the group. Add to the end of the linked list to
253
     * ensure the order for the realtime offsets is as expected. The received
254
     * DataSetMessages are matched via UA_DataSetReader_checkIdentifier for the
255
     * non-RT path. */
256
0
    UA_DataSetReader *after = LIST_FIRST(&rg->readers);
257
0
    if(!after) {
258
0
        LIST_INSERT_HEAD(&rg->readers, dsr, listEntry);
259
0
    } else {
260
0
        while(LIST_NEXT(after, listEntry))
261
0
            after = LIST_NEXT(after, listEntry);
262
0
        LIST_INSERT_AFTER(after, dsr, listEntry);
263
0
    }
264
0
    rg->readersCount++;
265
266
    /* Copy the config into the new dataSetReader */
267
0
    UA_StatusCode retVal =
268
0
        UA_DataSetReaderConfig_copy(dataSetReaderConfig, &dsr->config);
269
0
    if(retVal != UA_STATUSCODE_GOOD) {
270
0
        UA_DataSetReader_remove(psm, dsr);
271
0
        return retVal;
272
0
    }
273
274
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
275
0
    retVal = addDataSetReaderRepresentation(psm->sc.server, dsr);
276
0
    if(retVal != UA_STATUSCODE_GOOD) {
277
0
        UA_LOG_ERROR_PUBSUB(psm->logging, rg,
278
0
                            "Adding the DataSetReader to the information model failed");
279
0
        UA_DataSetReader_remove(psm, dsr);
280
0
        return retVal;
281
0
    }
282
#else
283
    UA_PubSubManager_generateUniqueNodeId(psm, &dsr->head.identifier);
284
#endif
285
286
    /* Cache the log string */
287
0
    char tmpLogIdStr[128];
288
0
    mp_snprintf(tmpLogIdStr, 128, "%SDataSetReader %N\t| ",
289
0
                dsr->linkedReaderGroup->head.logIdString, dsr->head.identifier);
290
0
    dsr->head.logIdString = UA_STRING_ALLOC(tmpLogIdStr);
291
292
    /* Connect to StandaloneSubscribedDataSet if a name is defined. Needs to be
293
     * added in the information model first, as this adds references to the
294
     * StandaloneSubscribedDataSet. */
295
0
    retVal = connectDSR2Standalone(psm, dsr);
296
0
    if(retVal != UA_STATUSCODE_GOOD) {
297
0
        UA_DataSetReader_remove(psm, dsr);
298
0
        return retVal;
299
0
    }
300
301
    /* Validate the config */
302
0
    retVal = validateDSRConfig(psm, dsr);
303
0
    if(retVal != UA_STATUSCODE_GOOD) {
304
0
        UA_DataSetReader_remove(psm, dsr);
305
0
        return retVal;
306
0
    }
307
308
    /* Notify the application that a new Reader was created.
309
     * This may internally adjust the config */
310
0
    UA_Server *server = psm->sc.server;
311
0
    if(server->config.pubSubConfig.componentLifecycleCallback) {
312
0
        UA_StatusCode res = server->config.pubSubConfig.
313
0
            componentLifecycleCallback(server, dsr->head.identifier,
314
0
                                       UA_PUBSUBCOMPONENT_DATASETREADER, false);
315
0
        if(res != UA_STATUSCODE_GOOD) {
316
0
            UA_DataSetReader_remove(psm, dsr);
317
0
            return res;
318
0
        }
319
0
    }
320
321
0
    UA_LOG_INFO_PUBSUB(psm->logging, dsr, "DataSetReader created (State: %s)",
322
0
                       UA_PubSubState_name(dsr->head.state));
323
324
0
    if(readerIdentifier)
325
0
        UA_NodeId_copy(&dsr->head.identifier, readerIdentifier);
326
327
    /* Enable the DataSetReader immediately if the enabled flag is set */
328
0
    if(dataSetReaderConfig->enabled)
329
0
        UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_OPERATIONAL,
330
0
                                        UA_STATUSCODE_GOOD);
331
332
0
    return UA_STATUSCODE_GOOD;
333
0
}
334
335
UA_StatusCode
336
0
UA_DataSetReader_remove(UA_PubSubManager *psm, UA_DataSetReader *dsr) {
337
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
338
339
0
    UA_ReaderGroup *rg = dsr->linkedReaderGroup;
340
0
    UA_assert(rg);
341
342
    /* Check if the ReaderGroup is enabled */
343
0
    if(UA_PubSubState_isEnabled(rg->head.state)) {
344
0
        UA_LOG_WARNING_PUBSUB(psm->logging, dsr,
345
0
                              "Removal of the DataSetReader not possible while "
346
0
                              "the ReaderGroup is enabled");
347
0
        return UA_STATUSCODE_BADINTERNALERROR;
348
0
    }
349
350
    /* Check with the application if we can remove */
351
0
    UA_Server *server = psm->sc.server;
352
0
    if(server->config.pubSubConfig.componentLifecycleCallback) {
353
0
        UA_StatusCode res = server->config.pubSubConfig.
354
0
            componentLifecycleCallback(server, dsr->head.identifier,
355
0
                                       UA_PUBSUBCOMPONENT_DATASETREADER, true);
356
0
        if(res != UA_STATUSCODE_GOOD)
357
0
            return res;
358
0
    }
359
360
    /* Disable and signal to the application */
361
0
    UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_DISABLED,
362
0
                                    UA_STATUSCODE_BADSHUTDOWN);
363
364
    /* Remove from information model */
365
0
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
366
0
    deleteNode(psm->sc.server, dsr->head.identifier, true);
367
0
#endif
368
369
    /* Check if a Standalone-SubscribedDataSet is associated with this reader
370
     * and disconnect it*/
371
0
    const UA_String sdsName = dsr->config.linkedStandaloneSubscribedDataSetName;
372
0
    UA_SubscribedDataSet *sds = (UA_String_isEmpty(&sdsName)) ?
373
0
        NULL : UA_SubscribedDataSet_findByName(psm, sdsName);
374
0
    if(sds && sds->connectedReader == dsr)
375
0
        sds->connectedReader = NULL;
376
377
    /* Remove DataSetReader from group */
378
0
    LIST_REMOVE(dsr, listEntry);
379
0
    rg->readersCount--;
380
381
0
    UA_LOG_INFO_PUBSUB(psm->logging, dsr, "DataSetReader deleted");
382
383
0
    UA_DataSetReaderConfig_clear(&dsr->config);
384
0
    UA_PubSubComponentHead_clear(&dsr->head);
385
0
    UA_free(dsr);
386
387
0
    return UA_STATUSCODE_GOOD;
388
0
}
389
390
UA_StatusCode
391
UA_DataSetReaderConfig_copy(const UA_DataSetReaderConfig *src,
392
0
                            UA_DataSetReaderConfig *dst) {
393
0
    memcpy(dst, src, sizeof(UA_DataSetReaderConfig));
394
0
    dst->writerGroupId = src->writerGroupId;
395
0
    dst->dataSetWriterId = src->dataSetWriterId;
396
0
    dst->dataSetFieldContentMask = src->dataSetFieldContentMask;
397
0
    dst->messageReceiveTimeout = src->messageReceiveTimeout;
398
399
0
    UA_StatusCode ret = UA_String_copy(&src->name, &dst->name);
400
0
    ret |= UA_PublisherId_copy(&src->publisherId, &dst->publisherId);
401
0
    ret |= UA_DataSetMetaDataType_copy(&src->dataSetMetaData, &dst->dataSetMetaData);
402
0
    ret |= UA_ExtensionObject_copy(&src->messageSettings, &dst->messageSettings);
403
0
    ret |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
404
0
    ret |= UA_String_copy(&src->linkedStandaloneSubscribedDataSetName,
405
0
                             &dst->linkedStandaloneSubscribedDataSetName);
406
407
0
    if(src->subscribedDataSetType == UA_PUBSUB_SDS_TARGET) {
408
0
        ret |= UA_TargetVariablesDataType_copy(&src->subscribedDataSet.target,
409
0
                                               &dst->subscribedDataSet.target);
410
0
    }
411
412
0
    if(ret != UA_STATUSCODE_GOOD)
413
0
        UA_DataSetReaderConfig_clear(dst);
414
415
0
    return ret;
416
0
}
417
418
void
419
0
UA_DataSetReaderConfig_clear(UA_DataSetReaderConfig *cfg) {
420
0
    UA_String_clear(&cfg->name);
421
0
    UA_String_clear(&cfg->linkedStandaloneSubscribedDataSetName);
422
0
    UA_PublisherId_clear(&cfg->publisherId);
423
0
    UA_DataSetMetaDataType_clear(&cfg->dataSetMetaData);
424
0
    UA_ExtensionObject_clear(&cfg->messageSettings);
425
0
    UA_ExtensionObject_clear(&cfg->transportSettings);
426
0
    if(cfg->subscribedDataSetType == UA_PUBSUB_SDS_TARGET) {
427
0
        UA_TargetVariablesDataType_clear(&cfg->subscribedDataSet.target);
428
0
    }
429
0
}
430
431
UA_StatusCode
432
UA_DataSetReader_setPubSubState(UA_PubSubManager *psm, UA_DataSetReader *dsr,
433
0
                                UA_PubSubState targetState, UA_StatusCode errorReason) {
434
0
    UA_ReaderGroup *rg = dsr->linkedReaderGroup;
435
0
    UA_assert(rg);
436
437
    /* Callback to modify the WriterGroup config and change the targetState
438
     * before the state machine executes */
439
0
    UA_Server *server = psm->sc.server;
440
0
    if(server->config.pubSubConfig.beforeStateChangeCallback) {
441
0
        server->config.pubSubConfig.
442
0
            beforeStateChangeCallback(server, dsr->head.identifier, &targetState);
443
0
    }
444
445
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
446
0
    UA_PubSubState oldState = dsr->head.state;
447
448
    /* Custom state machine */
449
0
    if(dsr->config.customStateMachine) {
450
0
        res = dsr->config.customStateMachine(server, dsr->head.identifier,
451
0
                                             dsr->config.context,
452
0
                                             &dsr->head.state, targetState);
453
0
        if(res != UA_STATUSCODE_GOOD)
454
0
            errorReason = res;
455
0
        goto finalize_state_machine;
456
0
    }
457
458
    /* Internal state machine */
459
0
    switch(targetState) {
460
        /* Disabled */
461
0
    case UA_PUBSUBSTATE_DISABLED:
462
0
    case UA_PUBSUBSTATE_ERROR:
463
0
        dsr->head.state = targetState;
464
0
        break;
465
466
        /* Enabled */
467
0
    case UA_PUBSUBSTATE_PAUSED:
468
0
    case UA_PUBSUBSTATE_PREOPERATIONAL:
469
0
    case UA_PUBSUBSTATE_OPERATIONAL:
470
0
        if(rg->head.state == UA_PUBSUBSTATE_DISABLED ||
471
0
           rg->head.state == UA_PUBSUBSTATE_ERROR ||
472
0
           rg->head.state == UA_PUBSUBSTATE_PAUSED) {
473
0
            dsr->head.state = UA_PUBSUBSTATE_PAUSED; /* RG is disabled -> paused */
474
0
        } else {
475
0
            dsr->head.state = rg->head.state; /* RG is enabled -> same state */
476
0
        }
477
0
        break;
478
479
0
    default:
480
0
        dsr->head.state = UA_PUBSUBSTATE_ERROR;
481
0
        res = UA_STATUSCODE_BADINTERNALERROR;
482
0
        errorReason = res;
483
0
        break;
484
0
    }
485
486
    /* Only keep the timeout callback if the reader is operational */
487
0
    if(dsr->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
488
0
       dsr->msgRcvTimeoutTimerId != 0) {
489
0
        UA_EventLoop *el = psm->sc.server->config.eventLoop;
490
0
        el->removeTimer(el, dsr->msgRcvTimeoutTimerId);
491
0
        dsr->msgRcvTimeoutTimerId = 0;
492
0
    }
493
494
0
 finalize_state_machine:
495
496
    /* No state change has happened */
497
0
    if(dsr->head.state == oldState)
498
0
        return res;
499
500
0
    UA_LOG_INFO_PUBSUB(psm->logging, dsr, "%s -> %s",
501
0
                       UA_PubSubState_name(oldState),
502
0
                       UA_PubSubState_name(dsr->head.state));
503
504
    /* Inform application about state change */
505
0
    if(server->config.pubSubConfig.stateChangeCallback)
506
0
        server->config.pubSubConfig.
507
0
            stateChangeCallback(server, dsr->head.identifier,
508
0
                                dsr->head.state, errorReason);
509
                                
510
0
    return res;
511
0
}
512
513
/* This Method is used to initially set the SubscribedDataSet to
514
 * TargetVariablesType and to create the list of target Variables of a
515
 * SubscribedDataSetType. */
516
UA_StatusCode
517
DataSetReader_createTargetVariables(UA_PubSubManager *psm, UA_DataSetReader *dsr,
518
0
                                    size_t tvsSize, const UA_FieldTargetDataType *tvs) {
519
0
    UA_LOCK_ASSERT(&psm->sc.server->serviceMutex);
520
521
0
    if(UA_PubSubState_isEnabled(dsr->head.state)) {
522
0
        UA_LOG_WARNING_PUBSUB(psm->logging, dsr,
523
0
                              "Cannot create Target Variables failed while "
524
0
                              "the DataSetReader is enabled");
525
0
        return UA_STATUSCODE_BADCONFIGURATIONERROR;
526
0
    }
527
528
0
    UA_TargetVariablesDataType newVars;
529
0
    UA_TargetVariablesDataType tmp = {tvsSize, (UA_FieldTargetDataType*)(uintptr_t)tvs};
530
0
    UA_StatusCode res = UA_TargetVariablesDataType_copy(&tmp, &newVars);   
531
0
    if(res != UA_STATUSCODE_GOOD)
532
0
        return res;
533
534
0
    UA_TargetVariablesDataType_clear(&dsr->config.subscribedDataSet.target);
535
0
    dsr->config.subscribedDataSet.target = newVars;
536
0
    return UA_STATUSCODE_GOOD;
537
0
}
538
539
static void
540
UA_DataSetReader_handleMessageReceiveTimeout(UA_PubSubManager *psm,
541
0
                                             UA_DataSetReader *dsr) {
542
0
    UA_assert(dsr->head.componentType == UA_PUBSUBCOMPONENT_DATASETREADER);
543
544
    /* Don't signal an error if we don't expect messages to arrive */
545
0
    if(dsr->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
546
0
       dsr->head.state != UA_PUBSUBSTATE_PREOPERATIONAL)
547
0
        return;
548
549
0
    UA_LOG_DEBUG_PUBSUB(psm->logging, dsr, "Message receive timeout occurred");
550
551
0
    lockServer(psm->sc.server);
552
0
    UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_ERROR,
553
0
                                    UA_STATUSCODE_BADTIMEOUT);
554
0
    unlockServer(psm->sc.server);
555
0
}
556
557
void
558
UA_DataSetReader_process(UA_PubSubManager *psm, UA_DataSetReader *dsr,
559
0
                         UA_DataSetMessage *msg) {
560
0
    if(!dsr || !msg || !psm)
561
0
        return;
562
563
0
    UA_LOG_DEBUG_PUBSUB(psm->logging, dsr, "Received a network message");
564
565
    /* Received a (first) message for the Reader.
566
     * Transition from PreOperational to Operational. */
567
0
    if(dsr->head.state == UA_PUBSUBSTATE_PREOPERATIONAL)
568
0
        UA_DataSetReader_setPubSubState(psm, dsr, dsr->head.state, UA_STATUSCODE_GOOD);
569
570
0
    if(dsr->head.state != UA_PUBSUBSTATE_OPERATIONAL &&
571
0
       dsr->head.state != UA_PUBSUBSTATE_PREOPERATIONAL) {
572
0
        UA_LOG_WARNING_PUBSUB(psm->logging, dsr,
573
0
                              "Received a network message but not operational");
574
0
        return;
575
0
    }
576
577
0
    if(!msg->header.dataSetMessageValid) {
578
0
        UA_LOG_INFO_PUBSUB(psm->logging, dsr,
579
0
                           "DataSetMessage is discarded: message is not valid");
580
0
        return;
581
0
    }
582
583
    /* TODO: Check ConfigurationVersion */
584
    /* if(msg->header.configVersionMajorVersionEnabled) {
585
     *     if(msg->header.configVersionMajorVersion !=
586
     *            dsr->config.dataSetMetaData.configurationVersion.majorVersion) {
587
     *         UA_LOG_WARNING(psm->logging, UA_LOGCATEGORY_SERVER,
588
     *                        "DataSetMessage is discarded: ConfigurationVersion "
589
     *                        "MajorVersion does not match");
590
     *         return;
591
     *     }
592
     * } */
593
594
0
    if(msg->header.dataSetMessageType != UA_DATASETMESSAGE_DATAKEYFRAME) {
595
0
        UA_LOG_WARNING_PUBSUB(psm->logging, dsr,
596
0
                              "DataSetMessage is discarded: Only keyframes are supported");
597
0
        return;
598
0
    }
599
600
    /* Configure / Update the timeout callback */
601
0
    if(dsr->config.messageReceiveTimeout > 0.0) {
602
0
        UA_EventLoop *el = psm->sc.server->config.eventLoop;
603
0
        if(dsr->msgRcvTimeoutTimerId == 0) {
604
0
            el->addTimer(el, (UA_Callback)UA_DataSetReader_handleMessageReceiveTimeout,
605
0
                         psm, dsr, dsr->config.messageReceiveTimeout, NULL,
606
0
                         UA_TIMERPOLICY_CURRENTTIME, &dsr->msgRcvTimeoutTimerId);
607
0
        } else {
608
            /* Reset the next execution time to now + interval */
609
0
            el->modifyTimer(el, dsr->msgRcvTimeoutTimerId,
610
0
                            dsr->config.messageReceiveTimeout, NULL,
611
0
                            UA_TIMERPOLICY_CURRENTTIME);
612
0
        }
613
0
    }
614
615
    /* Received a heartbeat with no fields */
616
0
    if(msg->fieldCount == 0)
617
0
        return;
618
619
    /* Check whether the field count matches the configuration */
620
0
    UA_TargetVariablesDataType *tvs = &dsr->config.subscribedDataSet.target;
621
0
    if(tvs->targetVariablesSize != msg->fieldCount) {
622
0
        UA_LOG_WARNING_PUBSUB(psm->logging, dsr,
623
0
                              "Number of fields does not match the "
624
0
                              "TargetVariables configuration");
625
0
        return;
626
0
    }
627
628
    /* Write the message fields. RT has the external data value configured. */
629
0
    UA_StatusCode res = UA_STATUSCODE_GOOD;
630
0
    for(size_t i = 0; i < msg->fieldCount; i++) {
631
0
        UA_FieldTargetDataType *tv = &tvs->targetVariables[i];
632
0
        UA_DataValue *field = &msg->data.keyFrameFields[i];
633
0
        if(!field->hasValue)
634
0
            continue;
635
636
        /* Write via the Write-Service */
637
0
        UA_WriteValue writeVal;
638
0
        UA_WriteValue_init(&writeVal);
639
0
        writeVal.attributeId = tv->attributeId;
640
0
        writeVal.indexRange = tv->receiverIndexRange;
641
0
        writeVal.nodeId = tv->targetNodeId;
642
0
        writeVal.value = *field;
643
0
        Operation_Write(psm->sc.server, &psm->sc.server->adminSession, &writeVal, &res);
644
0
        if(res != UA_STATUSCODE_GOOD)
645
0
            UA_LOG_INFO_PUBSUB(psm->logging, dsr,
646
0
                               "Error writing KeyFrame field %u: %s",
647
0
                               (unsigned)i, UA_StatusCode_name(res));
648
0
    }
649
0
}
650
651
/**************/
652
/* Server API */
653
/**************/
654
655
UA_StatusCode
656
UA_Server_addDataSetReader(UA_Server *server, UA_NodeId readerGroupId,
657
                           const UA_DataSetReaderConfig *config,
658
0
                           UA_NodeId *dsrId) {
659
0
    if(!server || !config)
660
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
661
0
    lockServer(server);
662
0
    UA_StatusCode res =
663
0
        UA_DataSetReader_create(getPSM(server), readerGroupId, config, dsrId);
664
0
    unlockServer(server);
665
0
    return res;
666
0
}
667
668
UA_StatusCode
669
0
UA_Server_removeDataSetReader(UA_Server *server, const UA_NodeId readerId) {
670
0
    if(!server)
671
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
672
0
    lockServer(server);
673
0
    UA_PubSubManager *psm = getPSM(server);
674
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, readerId);
675
0
    UA_StatusCode res = (dsr) ?
676
0
        UA_DataSetReader_remove(psm, dsr) : UA_STATUSCODE_BADNOTFOUND;
677
0
    unlockServer(server);
678
0
    return res;
679
0
}
680
681
UA_StatusCode
682
UA_Server_getDataSetReaderConfig(UA_Server *server, const UA_NodeId dsrId,
683
0
                                 UA_DataSetReaderConfig *config) {
684
0
    if(!server || !config)
685
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
686
0
    lockServer(server);
687
0
    UA_PubSubManager *psm = getPSM(server);
688
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, dsrId);
689
0
    UA_StatusCode res = (dsr) ?
690
0
        UA_DataSetReaderConfig_copy(&dsr->config, config) : UA_STATUSCODE_BADNOTFOUND;
691
0
    unlockServer(server);
692
0
    return res;
693
0
}
694
695
UA_StatusCode
696
UA_Server_getDataSetReaderState(UA_Server *server, const UA_NodeId dsrId,
697
0
                                UA_PubSubState *state) {
698
0
    if(!server || !state)
699
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
700
0
    lockServer(server);
701
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(getPSM(server), dsrId);
702
0
    UA_StatusCode res = UA_STATUSCODE_BADNOTFOUND;
703
0
    if(dsr) {
704
0
        res = UA_STATUSCODE_GOOD;
705
0
        *state = dsr->head.state;
706
0
    }
707
0
    unlockServer(server);
708
0
    return res;
709
0
}
710
711
UA_StatusCode
712
0
UA_Server_enableDataSetReader(UA_Server *server, const UA_NodeId dsrId) {
713
0
    if(!server)
714
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
715
0
    lockServer(server);
716
0
    UA_StatusCode ret = UA_STATUSCODE_GOOD;
717
0
    UA_PubSubManager *psm = getPSM(server);
718
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, dsrId);
719
0
    if(dsr)
720
0
        UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_OPERATIONAL,
721
0
                                        UA_STATUSCODE_GOOD);
722
0
    else
723
0
        ret = UA_STATUSCODE_BADNOTFOUND;
724
0
    unlockServer(server);
725
0
    return ret;
726
0
}
727
728
UA_StatusCode
729
0
UA_Server_disableDataSetReader(UA_Server *server, const UA_NodeId dsrId) {
730
0
    if(!server)
731
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
732
0
    lockServer(server);
733
0
    UA_StatusCode ret = UA_STATUSCODE_GOOD;
734
0
    UA_PubSubManager *psm = getPSM(server);
735
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, dsrId);
736
0
    if(dsr)
737
0
        UA_DataSetReader_setPubSubState(psm, dsr, UA_PUBSUBSTATE_DISABLED,
738
0
                                        UA_STATUSCODE_GOOD);
739
0
    else
740
0
        ret = UA_STATUSCODE_BADNOTFOUND;
741
0
    unlockServer(server);
742
0
    return ret;
743
0
}
744
745
UA_StatusCode
746
UA_Server_setDataSetReaderTargetVariables(UA_Server *server, const UA_NodeId dsrId,
747
                                          size_t targetVariablesSize,
748
0
                                          const UA_FieldTargetDataType *targetVariables) {
749
0
    if(!server)
750
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
751
0
    lockServer(server);
752
0
    UA_PubSubManager *psm = getPSM(server);
753
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, dsrId);
754
0
    UA_StatusCode res = (dsr) ?
755
0
        DataSetReader_createTargetVariables(psm, dsr, targetVariablesSize,
756
0
                                            targetVariables) : UA_STATUSCODE_BADNOTFOUND;
757
0
    unlockServer(server);
758
0
    return res;
759
0
}
760
761
UA_StatusCode
762
UA_Server_updateDataSetReaderConfig(UA_Server *server, const UA_NodeId dsrId,
763
0
                                    const UA_DataSetReaderConfig *config) {
764
0
    if(!server || !config)
765
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
766
767
0
    lockServer(server);
768
0
    UA_PubSubManager *psm = getPSM(server);
769
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, dsrId);
770
0
    if(!dsr) {
771
0
        unlockServer(server);
772
0
        return UA_STATUSCODE_BADNOTFOUND;
773
0
    }
774
775
0
    if(UA_PubSubState_isEnabled(dsr->head.state)) {
776
0
        UA_LOG_ERROR_PUBSUB(psm->logging, dsr,
777
0
                            "The DataSetReader must be disabled to update the config");
778
0
        unlockServer(server);
779
0
        return UA_STATUSCODE_BADINTERNALERROR;
780
0
    }
781
782
    /* Store the old config */
783
0
    UA_DataSetReaderConfig oldConfig = dsr->config;
784
785
    /* Copy the config into the new dataSetReader */
786
0
    UA_StatusCode retVal = UA_DataSetReaderConfig_copy(config, &dsr->config);
787
0
    if(retVal != UA_STATUSCODE_GOOD)
788
0
        goto errout;
789
790
    /* Change the connection to a StandaloneSubscribedDataSet */
791
0
    if(!UA_String_equal(&dsr->config.linkedStandaloneSubscribedDataSetName,
792
0
                        &oldConfig.linkedStandaloneSubscribedDataSetName)) {
793
0
        disconnectDSR2Standalone(psm, dsr);
794
0
        retVal = connectDSR2Standalone(psm, dsr);
795
0
        if(retVal != UA_STATUSCODE_GOOD)
796
0
            goto errout;
797
0
    }
798
799
    /* Validate the new config */
800
0
    retVal = validateDSRConfig(psm, dsr);
801
0
    if(retVal != UA_STATUSCODE_GOOD)
802
0
        goto errout;
803
804
    /* Clean up and return */
805
0
    UA_DataSetReaderConfig_clear(&oldConfig);
806
0
    unlockServer(server);
807
0
    return UA_STATUSCODE_GOOD;
808
809
    /* Fall back to the old config */
810
0
 errout:
811
0
    UA_DataSetReaderConfig_clear(&dsr->config);
812
0
    dsr->config = oldConfig;
813
0
    unlockServer(server);
814
0
    return retVal;
815
0
}
816
817
/**********************/
818
/* Offset Computation */
819
/**********************/
820
821
static UA_StatusCode
822
UA_PubSubDataSetReader_generateKeyFrameMessage(UA_Server *server,
823
                                               UA_DataSetMessage *dsm,
824
0
                                               UA_DataSetReader *dsr) {
825
    /* Prepare DataSetMessageContent */
826
0
    UA_TargetVariablesDataType *tv = &dsr->config.subscribedDataSet.target;
827
0
    UA_DataSetMetaDataType *metaData = &dsr->config.dataSetMetaData;
828
0
    if(tv->targetVariablesSize != metaData->fieldsSize)
829
0
        metaData = NULL;
830
0
    dsm->header.dataSetMessageValid = true;
831
0
    dsm->header.dataSetMessageType = UA_DATASETMESSAGE_DATAKEYFRAME;
832
0
    dsm->fieldCount = (UA_UInt16) tv->targetVariablesSize;
833
0
    dsm->data.keyFrameFields = (UA_DataValue *)
834
0
            UA_Array_new(tv->targetVariablesSize, &UA_TYPES[UA_TYPES_DATAVALUE]);
835
0
    if(!dsm->data.keyFrameFields)
836
0
        return UA_STATUSCODE_BADOUTOFMEMORY;
837
838
0
     for(size_t counter = 0; counter < tv->targetVariablesSize; counter++) {
839
        /* Read the value and set the source in the reader config */
840
0
        UA_DataValue *dfv = &dsm->data.keyFrameFields[counter];
841
0
        UA_FieldTargetDataType *ftv = &tv->targetVariables[counter];
842
843
        /* Synthesize the field value from the FieldMetaData. This allows us to
844
         * prevent a read from the information model during startup. */
845
0
        UA_FieldMetaData *fieldMetaData = (metaData) ? &metaData->fields[counter] : NULL;
846
0
        if(fieldMetaData && fieldMetaData->valueRank == UA_VALUERANK_SCALAR) {
847
0
            const UA_DataType *type =
848
0
                UA_findDataTypeWithCustom(&fieldMetaData->dataType,
849
0
                                          server->config.customDataTypes);
850
0
            if(type == &UA_TYPES[UA_TYPES_STRING] && fieldMetaData->maxStringLength > 0) {
851
0
                UA_String *s = UA_String_new();
852
0
                if(!s) {
853
0
                    UA_DataSetMessage_clear(dsm);
854
0
                    return UA_STATUSCODE_BADOUTOFMEMORY;
855
0
                }
856
0
                s->data = (UA_Byte*)
857
0
                    UA_calloc(fieldMetaData->maxStringLength, sizeof(UA_Byte));
858
0
                if(!s->data) {
859
0
                    UA_free(s);
860
0
                    UA_DataSetMessage_clear(dsm);
861
0
                    return UA_STATUSCODE_BADOUTOFMEMORY;
862
0
                }
863
0
                s->length = fieldMetaData->maxStringLength;
864
0
                UA_Variant_setScalar(&dfv->value, s, type);
865
0
                dfv->hasValue = true;
866
0
            } else if(type && type->memSize < 512) {
867
0
                char buf[512];
868
0
                UA_init(buf, type);
869
0
                UA_StatusCode res = UA_Variant_setScalarCopy(&dfv->value, buf, type);
870
0
                if(res != UA_STATUSCODE_GOOD) {
871
0
                    UA_DataSetMessage_clear(dsm);
872
0
                    return res;
873
0
                }
874
0
                dfv->hasValue = true;
875
0
            }
876
0
        }
877
878
        /* Read the value from the information model */
879
0
        if(!dfv->hasValue) {
880
0
            UA_ReadValueId rvi;
881
0
            UA_ReadValueId_init(&rvi);
882
0
            rvi.nodeId = ftv->targetNodeId;
883
0
            rvi.attributeId = ftv->attributeId;
884
0
            rvi.indexRange = ftv->writeIndexRange;
885
0
            *dfv = readWithSession(server, &server->adminSession, &rvi,
886
0
                                   UA_TIMESTAMPSTORETURN_NEITHER);
887
0
        }
888
889
        /* Deactivate statuscode? */
890
0
        if(((u64)dsr->config.dataSetFieldContentMask &
891
0
            (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE) == 0)
892
0
            dfv->hasStatus = false;
893
894
        /* Deactivate timestamps */
895
0
        if(((u64)dsr->config.dataSetFieldContentMask &
896
0
            (u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP) == 0)
897
0
            dfv->hasSourceTimestamp = false;
898
0
        if(((u64)dsr->config.dataSetFieldContentMask &
899
0
            (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS) == 0)
900
0
            dfv->hasSourcePicoseconds = false;
901
0
        if(((u64)dsr->config.dataSetFieldContentMask &
902
0
            (u64)UA_DATASETFIELDCONTENTMASK_SERVERTIMESTAMP) == 0)
903
0
            dfv->hasServerTimestamp = false;
904
0
        if(((u64)dsr->config.dataSetFieldContentMask &
905
0
            (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS) == 0)
906
0
            dfv->hasServerPicoseconds = false;
907
0
    }
908
909
0
    return UA_STATUSCODE_GOOD;
910
0
}
911
912
/* Generate a DataSetMessage for the given reader. */
913
UA_StatusCode
914
UA_DataSetReader_generateDataSetMessage(UA_Server *server,
915
                                        UA_DataSetMessage *dsm,
916
0
                                        UA_DataSetReader *dsr) {
917
    /* Support only for UADP configuration
918
     * TODO: JSON encoding if UA_DataSetReader_generateDataSetMessage used other
919
     * that RT configuration */
920
921
0
    UA_ExtensionObject *settings = &dsr->config.messageSettings;
922
0
    if(settings->content.decoded.type != &UA_TYPES[UA_TYPES_UADPDATASETREADERMESSAGEDATATYPE])
923
0
        return UA_STATUSCODE_BADNOTSUPPORTED;
924
925
    /* The configuration Flags are included inside the std. defined
926
     * UA_UadpDataSetReaderMessageDataType */
927
0
    UA_UadpDataSetReaderMessageDataType defaultUadpConfiguration;
928
0
    UA_UadpDataSetReaderMessageDataType *dsrMessageDataType =
929
0
        (UA_UadpDataSetReaderMessageDataType*) settings->content.decoded.data;
930
931
0
    if(!(settings->encoding == UA_EXTENSIONOBJECT_DECODED ||
932
0
         settings->encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) ||
933
0
       !dsrMessageDataType->dataSetMessageContentMask) {
934
        /* Create default flag configuration if no dataSetMessageContentMask or
935
         * even messageSettings in UadpDataSetWriterMessageDataType was
936
         * passed. */
937
0
        memset(&defaultUadpConfiguration, 0, sizeof(UA_UadpDataSetReaderMessageDataType));
938
0
        defaultUadpConfiguration.dataSetMessageContentMask = (UA_UadpDataSetMessageContentMask)
939
0
            ((u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP |
940
0
             (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION |
941
0
             (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION);
942
0
        dsrMessageDataType = &defaultUadpConfiguration;
943
0
    }
944
945
    /* The field encoding depends on the flags inside the reader config. */
946
0
    if(dsr->config.dataSetFieldContentMask & (u64)UA_DATASETFIELDCONTENTMASK_RAWDATA) {
947
0
        dsm->header.fieldEncoding = UA_FIELDENCODING_RAWDATA;
948
0
    } else if((u64)dsr->config.dataSetFieldContentMask &
949
0
              ((u64)UA_DATASETFIELDCONTENTMASK_SOURCETIMESTAMP |
950
0
               (u64)UA_DATASETFIELDCONTENTMASK_SERVERPICOSECONDS |
951
0
               (u64)UA_DATASETFIELDCONTENTMASK_SOURCEPICOSECONDS |
952
0
               (u64)UA_DATASETFIELDCONTENTMASK_STATUSCODE)) {
953
0
        dsm->header.fieldEncoding = UA_FIELDENCODING_DATAVALUE;
954
0
    } else {
955
0
        dsm->header.fieldEncoding = UA_FIELDENCODING_VARIANT;
956
0
    }
957
958
    /* Std: 'The DataSetMessageContentMask defines the flags for the content
959
     * of the DataSetMessage header.' */
960
0
    if((u64)dsrMessageDataType->dataSetMessageContentMask &
961
0
       (u64)UA_UADPDATASETMESSAGECONTENTMASK_MAJORVERSION) {
962
0
        dsm->header.configVersionMajorVersionEnabled = true;
963
0
        dsm->header.configVersionMajorVersion =
964
0
            dsr->config.dataSetMetaData.configurationVersion.majorVersion;
965
0
    }
966
967
0
    if((u64)dsrMessageDataType->dataSetMessageContentMask &
968
0
       (u64)UA_UADPDATASETMESSAGECONTENTMASK_MINORVERSION) {
969
0
        dsm->header.configVersionMinorVersionEnabled = true;
970
0
        dsm->header.configVersionMinorVersion =
971
0
            dsr->config.dataSetMetaData.configurationVersion.minorVersion;
972
0
    }
973
974
0
    if((u64)dsrMessageDataType->dataSetMessageContentMask &
975
0
       (u64)UA_UADPDATASETMESSAGECONTENTMASK_SEQUENCENUMBER) {
976
        /* Will be modified when subscriber receives new nw msg */
977
0
        dsm->header.dataSetMessageSequenceNrEnabled = true;
978
0
        dsm->header.dataSetMessageSequenceNr = 1;
979
0
    }
980
981
0
    if((u64)dsrMessageDataType->dataSetMessageContentMask &
982
0
       (u64)UA_UADPDATASETMESSAGECONTENTMASK_TIMESTAMP) {
983
0
        dsm->header.timestampEnabled = true;
984
0
        dsm->header.timestamp = UA_DateTime_now();
985
0
    }
986
987
0
    if((u64)dsrMessageDataType->dataSetMessageContentMask &
988
0
       (u64)UA_UADPDATASETMESSAGECONTENTMASK_PICOSECONDS)
989
0
        dsm->header.picoSecondsIncluded = false;
990
991
0
    if((u64)dsrMessageDataType->dataSetMessageContentMask &
992
0
       (u64)UA_UADPDATASETMESSAGECONTENTMASK_STATUS)
993
0
        dsm->header.statusEnabled = true;
994
995
    /* Not supported for Delta frames atm */
996
0
    return UA_PubSubDataSetReader_generateKeyFrameMessage(server, dsm, dsr);
997
0
}
998
999
1000
UA_StatusCode
1001
UA_Server_computeDataSetReaderOffsetTable(UA_Server *server,
1002
                                          const UA_NodeId dataSetReaderId,
1003
0
                                          UA_PubSubOffsetTable *ot) {
1004
    /* Validate the arguments */
1005
0
    if(!server || !ot)
1006
0
        return UA_STATUSCODE_BADINVALIDARGUMENT;
1007
1008
0
    lockServer(server);
1009
1010
    /* Get the DataSetReader */
1011
0
    UA_PubSubManager *psm = getPSM(server);
1012
0
    UA_DataSetReader *dsr = UA_DataSetReader_find(psm, dataSetReaderId);
1013
0
    if(!dsr) {
1014
0
        unlockServer(server);
1015
0
        return UA_STATUSCODE_BADNOTFOUND;
1016
0
    }
1017
1018
    /* Generate the DataSetMessage */
1019
0
    UA_DataSetMessage dsm;
1020
0
    memset(&dsm, 0, sizeof(UA_DataSetMessage));
1021
0
    UA_StatusCode res = UA_DataSetReader_generateDataSetMessage(server, &dsm, dsr);
1022
0
    if(res != UA_STATUSCODE_GOOD) {
1023
0
        unlockServer(server);
1024
0
        return res;
1025
0
    }
1026
1027
    /* Reset the OffsetTable */
1028
0
    memset(ot, 0, sizeof(UA_PubSubOffsetTable));
1029
1030
    /* Prepare the encoding context */
1031
0
    UA_DataSetMessage_EncodingMetaData emd;
1032
0
    memset(&emd, 0, sizeof(UA_DataSetMessage_EncodingMetaData));
1033
0
    emd.dataSetWriterId = dsr->config.dataSetWriterId;
1034
0
    emd.fields = dsr->config.dataSetMetaData.fields;
1035
0
    emd.fieldsSize = dsr->config.dataSetMetaData.fieldsSize;
1036
1037
0
    PubSubEncodeCtx ctx;
1038
0
    memset(&ctx, 0, sizeof(PubSubEncodeCtx));
1039
0
    ctx.ot = ot;
1040
0
    ctx.eo.metaData = &emd;
1041
0
    ctx.eo.metaDataSize = 1;
1042
1043
    /* Compute the offset */
1044
0
    size_t fieldindex = 0;
1045
0
    UA_FieldTargetDataType *tv = NULL;
1046
0
    size_t msgSize = UA_DataSetMessage_calcSizeBinary(&ctx, &emd, &dsm, 0);
1047
0
    if(msgSize == 0) {
1048
0
        res = UA_STATUSCODE_BADINTERNALERROR;
1049
0
        goto errout;
1050
0
    }
1051
1052
    /* Allocate the message */
1053
0
    res = UA_ByteString_allocBuffer(&ot->networkMessage, msgSize);
1054
0
    if(res != UA_STATUSCODE_GOOD)
1055
0
        goto errout;
1056
1057
    /* Create the ByteString of the encoded DataSetMessage */
1058
0
    ctx.ctx.pos = ot->networkMessage.data;
1059
0
    ctx.ctx.end = ot->networkMessage.data + ot->networkMessage.length;
1060
0
    res = UA_DataSetMessage_encodeBinary(&ctx, &emd, &dsm);
1061
0
    if(res != UA_STATUSCODE_GOOD)
1062
0
        goto errout;
1063
1064
    /* Pick up the component NodeIds */
1065
0
    for(size_t i = 0; i < ot->offsetsSize; i++) {
1066
0
        UA_PubSubOffset *o = &ot->offsets[i];
1067
0
        switch(o->offsetType) {
1068
0
        case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER:
1069
0
        case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_STATUS:
1070
0
        case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_TIMESTAMP:
1071
0
        case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_PICOSECONDS:
1072
0
            res |= UA_NodeId_copy(&dsr->head.identifier, &o->component);
1073
0
            break;
1074
0
        case UA_PUBSUBOFFSETTYPE_DATASETFIELD_DATAVALUE:
1075
0
            tv = &dsr->config.subscribedDataSet.target.targetVariables[fieldindex];
1076
0
            res |= UA_NodeId_copy(&tv->targetNodeId, &o->component);
1077
0
            fieldindex++;
1078
0
            break;
1079
0
        case UA_PUBSUBOFFSETTYPE_DATASETFIELD_VARIANT:
1080
0
            tv = &dsr->config.subscribedDataSet.target.targetVariables[fieldindex];
1081
0
            res |= UA_NodeId_copy(&tv->targetNodeId, &o->component);
1082
0
            fieldindex++;
1083
0
            break;
1084
0
        case UA_PUBSUBOFFSETTYPE_DATASETFIELD_RAW:
1085
0
            tv = &dsr->config.subscribedDataSet.target.targetVariables[fieldindex];
1086
0
            res |= UA_NodeId_copy(&tv->targetNodeId, &o->component);
1087
0
            fieldindex++;
1088
0
            break;
1089
0
        default:
1090
0
            res = UA_STATUSCODE_BADINTERNALERROR;
1091
0
            break;
1092
0
        }
1093
0
    }
1094
1095
    /* Clean up */
1096
0
 errout:
1097
0
    UA_DataSetMessage_clear(&dsm);
1098
0
    if(res != UA_STATUSCODE_GOOD)
1099
0
        UA_PubSubOffsetTable_clear(ot);
1100
0
    unlockServer(server);
1101
0
    return res;
1102
0
}
1103
1104
#endif /* UA_ENABLE_PUBSUB */