Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/rtps/messages/submessages/DataMsg.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * DataMsg.hpp
17
 *
18
 */
19
20
#include <fastdds/dds/core/policy/ParameterTypes.hpp>
21
22
#include <fastdds/core/policy/ParameterSerializer.hpp>
23
#include <fastdds/core/policy/ParameterList.hpp>
24
#include <fastdds/rtps/transport/NetworkBuffer.hpp>
25
26
using NetworkBuffer = eprosima::fastdds::rtps::NetworkBuffer;
27
28
namespace eprosima {
29
namespace fastdds {
30
namespace rtps {
31
32
namespace {
33
34
struct DataMsgUtils
35
{
36
    static void prepare_submessage_flags(
37
            const CacheChange_t* change,
38
            TopicKind_t topicKind,
39
            bool expectsInlineQos,
40
            InlineQosWriter* inlineQos,
41
            bool& dataFlag,
42
            bool& keyFlag,
43
            bool& inlineQosFlag,
44
            octet& status)
45
0
    {
46
0
        inlineQosFlag =
47
0
                (nullptr != inlineQos) ||
48
0
                ((WITH_KEY == topicKind) &&
49
0
                (!change->writerGUID.is_builtin() || expectsInlineQos || change->kind != ALIVE)) ||
50
0
                (change->write_params.related_sample_identity() != SampleIdentity::unknown()) ||
51
0
                (change->write_params.has_more_replies());
52
53
0
        dataFlag = ALIVE == change->kind &&
54
0
                change->serializedPayload.length > 0 && nullptr != change->serializedPayload.data;
55
0
        keyFlag = !dataFlag && !inlineQosFlag && (WITH_KEY == topicKind);
56
57
0
        status = 0;
58
0
        if (change->kind == NOT_ALIVE_DISPOSED)
59
0
        {
60
0
            status = status | BIT(0);
61
0
        }
62
0
        if (change->kind == NOT_ALIVE_UNREGISTERED)
63
0
        {
64
0
            status = status | BIT(1);
65
0
        }
66
0
        if (change->kind == NOT_ALIVE_DISPOSED_UNREGISTERED)
67
0
        {
68
0
            status = status | BIT(0);
69
0
            status = status | BIT(1);
70
0
        }
71
0
    }
72
73
    static bool serialize_header(
74
            uint32_t fragment_number,
75
            CDRMessage_t* msg,
76
            const CacheChange_t* change,
77
            const EntityId_t& readerId,
78
            octet flags,
79
            uint32_t& submessage_size_pos,
80
            uint32_t& position_size_count_size)
81
0
    {
82
0
        bool ok = true;
83
0
        bool is_fragment = fragment_number > 0;
84
85
0
        CDRMessage::addOctet(msg, is_fragment ? DATA_FRAG : DATA);
86
0
        CDRMessage::addOctet(msg, flags);
87
0
        submessage_size_pos = msg->pos;
88
0
        CDRMessage::addUInt16(msg, 0);
89
0
        position_size_count_size = msg->pos;
90
91
        //extra flags. not in this version.
92
0
        ok &= CDRMessage::addUInt16(msg, 0);
93
        //octet to inline Qos is 12 or 28, may change in future versions
94
0
        ok &= is_fragment ?
95
0
                CDRMessage::addUInt16(msg, RTPSMESSAGE_OCTETSTOINLINEQOS_DATAFRAGSUBMSG) :
96
0
                CDRMessage::addUInt16(msg, RTPSMESSAGE_OCTETSTOINLINEQOS_DATASUBMSG);
97
        //Entity ids
98
0
        ok &= CDRMessage::addEntityId(msg, &readerId);
99
0
        ok &= CDRMessage::addEntityId(msg, &change->writerGUID.entityId);
100
        //Add Sequence Number
101
0
        ok &= CDRMessage::addSequenceNumber(msg, &change->sequenceNumber);
102
103
0
        if (is_fragment)
104
0
        {
105
            // Add fragment starting number
106
0
            ok &= CDRMessage::addUInt32(msg, fragment_number); // fragments start in 1
107
108
            // Add fragments in submessage
109
0
            ok &= CDRMessage::addUInt16(msg, 1); // we are sending one fragment
110
111
            // Add fragment size
112
0
            ok &= CDRMessage::addUInt16(msg, change->getFragmentSize());
113
114
            // Add total sample size
115
0
            ok &= CDRMessage::addUInt32(msg, change->serializedPayload.length);
116
0
        }
117
118
0
        return ok;
119
0
    }
120
121
    static void serialize_inline_qos(
122
            CDRMessage_t* msg,
123
            const CacheChange_t* change,
124
            TopicKind_t topicKind,
125
            bool expectsInlineQos,
126
            InlineQosWriter* inlineQos,
127
            octet status)
128
0
    {
129
0
        if (change->write_params.related_sample_identity() != SampleIdentity::unknown())
130
0
        {
131
0
            fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_sample_identity(msg,
132
0
                    change->write_params.related_sample_identity());
133
0
            fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_custom_related_sample_identity(
134
0
                msg,
135
0
                change->write_params.related_sample_identity());
136
0
        }
137
138
0
        if (change->write_params.has_more_replies())
139
0
        {
140
0
            fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_more_replies(msg);
141
0
        }
142
143
0
        if (WITH_KEY == topicKind && (!change->writerGUID.is_builtin() || expectsInlineQos || ALIVE != change->kind))
144
0
        {
145
0
            fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_key(msg,
146
0
                    change->instanceHandle);
147
148
0
            if (ALIVE != change->kind)
149
0
            {
150
0
                fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_status(msg, status);
151
0
            }
152
0
        }
153
154
0
        if (inlineQos != nullptr)
155
0
        {
156
0
            inlineQos->writeQosToCDRMessage(msg);
157
0
        }
158
159
0
        fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_sentinel(msg);
160
0
    }
161
162
};
163
164
}  // empty namespace
165
166
bool RTPSMessageCreator::addMessageData(
167
        CDRMessage_t* msg,
168
        GuidPrefix_t& guidprefix,
169
        const CacheChange_t* change,
170
        TopicKind_t topicKind,
171
        const EntityId_t& readerId,
172
        bool expectsInlineQos,
173
        InlineQosWriter* inlineQos)
174
0
{
175
176
0
    RTPSMessageCreator::addHeader(msg, guidprefix);
177
178
0
    RTPSMessageCreator::addSubmessageInfoTS_Now(msg, false);
179
180
0
    NetworkBuffer pending_buffer;
181
0
    uint8_t pending_padding = 0;
182
183
0
    bool is_big_submessage;
184
0
    RTPSMessageCreator::addSubmessageData(msg, change, topicKind, readerId, expectsInlineQos, inlineQos,
185
0
            is_big_submessage, true, pending_buffer, pending_padding);
186
187
0
    msg->length = msg->pos;
188
189
0
    return true;
190
0
}
191
192
bool RTPSMessageCreator::addSubmessageData(
193
        CDRMessage_t* msg,
194
        const CacheChange_t* change,
195
        TopicKind_t topicKind,
196
        const EntityId_t& readerId,
197
        bool expectsInlineQos,
198
        InlineQosWriter* inlineQos,
199
        bool& is_big_submessage,
200
        bool copy_data,
201
        NetworkBuffer& pending_buffer,
202
        uint8_t& pending_padding)
203
0
{
204
0
    octet status = 0;
205
0
    octet flags = 0;
206
207
    // Initialize output parameters
208
0
    is_big_submessage = false;
209
0
    pending_buffer = NetworkBuffer();
210
0
    pending_padding = 0;
211
212
    //Find out flags
213
0
    bool dataFlag = false;
214
0
    bool keyFlag = false;
215
0
    bool inlineQosFlag = false;
216
217
0
    Endianness_t old_endianess = msg->msg_endian;
218
#if FASTDDS_IS_BIG_ENDIAN_TARGET
219
    msg->msg_endian = BIGEND;
220
#else
221
0
    flags = flags | BIT(0);
222
0
    msg->msg_endian = LITTLEEND;
223
0
#endif // if FASTDDS_IS_BIG_ENDIAN_TARGET
224
225
0
    DataMsgUtils::prepare_submessage_flags(change, topicKind, expectsInlineQos, inlineQos,
226
0
            dataFlag, keyFlag, inlineQosFlag, status);
227
228
0
    if (inlineQosFlag)
229
0
    {
230
0
        flags = flags | BIT(1);
231
0
    }
232
233
0
    if (dataFlag)
234
0
    {
235
0
        flags = flags | BIT(2);
236
0
    }
237
238
0
    if (keyFlag)
239
0
    {
240
0
        flags = flags | BIT(3);
241
0
    }
242
243
    // Submessage header.
244
0
    uint32_t submessage_size_pos = 0;
245
0
    uint16_t submessage_size = 0;
246
0
    uint32_t position_size_count_size = 0;
247
0
    bool added_no_error = DataMsgUtils::serialize_header(0, msg, change, readerId, flags,
248
0
                    submessage_size_pos, position_size_count_size);
249
250
    //Add INLINE QOS AND SERIALIZED PAYLOAD DEPENDING ON FLAGS:
251
0
    if (inlineQosFlag) //inlineQoS
252
0
    {
253
0
        DataMsgUtils::serialize_inline_qos(msg, change, topicKind, expectsInlineQos, inlineQos, status);
254
0
    }
255
256
    //Add Serialized Payload
257
0
    if (dataFlag)
258
0
    {
259
0
        if (copy_data)
260
0
        {
261
0
            added_no_error &=
262
0
                    CDRMessage::addData(msg, change->serializedPayload.data, change->serializedPayload.length);
263
0
        }
264
0
        else if (msg->pos + change->serializedPayload.length > msg->max_size)
265
0
        {
266
0
            return false;
267
0
        }
268
0
        else
269
0
        {
270
0
            pending_buffer = NetworkBuffer(change->serializedPayload.data, change->serializedPayload.length);
271
0
            msg->pos += pending_buffer.size;
272
0
        }
273
0
    }
274
275
0
    if (keyFlag)
276
0
    {
277
0
        added_no_error &= CDRMessage::addOctet(msg, 0); //ENCAPSULATION
278
0
        if (msg->msg_endian == BIGEND)
279
0
        {
280
0
            added_no_error &= CDRMessage::addOctet(msg, PL_CDR_BE); //ENCAPSULATION
281
0
        }
282
0
        else
283
0
        {
284
0
            added_no_error &= CDRMessage::addOctet(msg, PL_CDR_LE); //ENCAPSULATION
285
0
        }
286
287
0
        added_no_error &= CDRMessage::addUInt16(msg, 0); //ENCAPSULATION OPTIONS
288
0
        added_no_error &=
289
0
                fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_key(msg,
290
0
                        change->instanceHandle);
291
0
        added_no_error &=
292
0
                fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_status(msg,
293
0
                        status);
294
0
        added_no_error &= fastdds::dds::ParameterSerializer<fastdds::dds::Parameter_t>::add_parameter_sentinel(msg);
295
0
    }
296
297
    // Align submessage to rtps alignment (4).
298
0
    uint8_t align = (4 - msg->pos % 4) & 3;
299
0
    if (copy_data)
300
0
    {
301
0
        for (uint32_t count = 0; count < align; ++count)
302
0
        {
303
0
            added_no_error &= CDRMessage::addOctet(msg, 0);
304
0
        }
305
0
    }
306
0
    else
307
0
    {
308
0
        pending_padding = align;
309
0
        msg->pos += align;
310
0
    }
311
312
0
    uint32_t size32 = msg->pos - position_size_count_size;
313
0
    if (size32 <= std::numeric_limits<uint16_t>::max())
314
0
    {
315
0
        submessage_size = static_cast<uint16_t>(size32);
316
0
        octet* o = reinterpret_cast<octet*>(&submessage_size);
317
0
        if (msg->msg_endian == DEFAULT_ENDIAN)
318
0
        {
319
0
            msg->buffer[submessage_size_pos] = *(o);
320
0
            msg->buffer[submessage_size_pos + 1] = *(o + 1);
321
0
        }
322
0
        else
323
0
        {
324
0
            msg->buffer[submessage_size_pos] = *(o + 1);
325
0
            msg->buffer[submessage_size_pos + 1] = *(o);
326
0
        }
327
0
    }
328
0
    else
329
0
    {
330
        // Submessage > 64 KB
331
0
        is_big_submessage = true;
332
0
    }
333
334
    // Rewind position when not copying data. Needed for size checks.
335
0
    if (!copy_data)
336
0
    {
337
0
        msg->pos -= pending_padding;
338
0
        msg->pos -= pending_buffer.size;
339
0
    }
340
341
0
    msg->msg_endian = old_endianess;
342
343
0
    return added_no_error;
344
0
}
345
346
bool RTPSMessageCreator::addMessageDataFrag(
347
        CDRMessage_t* msg,
348
        GuidPrefix_t& guidprefix,
349
        const CacheChange_t* change,
350
        uint32_t fragment_number,
351
        TopicKind_t topicKind,
352
        const EntityId_t& readerId,
353
        bool expectsInlineQos,
354
        InlineQosWriter* inlineQos)
355
0
{
356
0
    RTPSMessageCreator::addHeader(msg, guidprefix);
357
358
0
    RTPSMessageCreator::addSubmessageInfoTS_Now(msg, false);
359
360
    // Calculate fragment start
361
0
    uint32_t fragment_start = change->getFragmentSize() * (fragment_number - 1);
362
    // Calculate fragment size. If last fragment, size may be smaller
363
0
    uint32_t fragment_size = fragment_number < change->getFragmentCount() ? change->getFragmentSize() :
364
0
            change->serializedPayload.length - fragment_start;
365
366
    // TODO (Ricardo). Check to create special wrapper.
367
0
    SerializedPayload_t payload;
368
0
    payload.data = change->serializedPayload.data + fragment_start;
369
0
    payload.length = fragment_size;
370
371
0
    NetworkBuffer pending_buffer;
372
0
    uint8_t pending_padding = 0;
373
374
0
    RTPSMessageCreator::addSubmessageDataFrag(msg, change, fragment_number, payload,
375
0
            topicKind, readerId, expectsInlineQos, inlineQos, true, pending_buffer, pending_padding);
376
377
0
    payload.data = NULL;
378
379
0
    msg->length = msg->pos;
380
0
    return true;
381
0
}
382
383
bool RTPSMessageCreator::addSubmessageDataFrag(
384
        CDRMessage_t* msg,
385
        const CacheChange_t* change,
386
        uint32_t fragment_number,
387
        const SerializedPayload_t& payload,
388
        TopicKind_t topicKind,
389
        const EntityId_t& readerId,
390
        bool expectsInlineQos,
391
        InlineQosWriter* inlineQos,
392
        bool copy_data,
393
        NetworkBuffer& pending_buffer,
394
        uint8_t& pending_padding)
395
0
{
396
0
    octet status = 0;
397
0
    octet flags = 0;
398
    //Find out flags
399
0
    bool dataFlag = false;
400
0
    bool keyFlag = false;
401
0
    bool inlineQosFlag = false;
402
403
    // Initialize output parameters
404
0
    pending_buffer = NetworkBuffer();
405
0
    pending_padding = 0;
406
407
0
    Endianness_t old_endianess = msg->msg_endian;
408
#if FASTDDS_IS_BIG_ENDIAN_TARGET
409
    msg->msg_endian = BIGEND;
410
#else
411
0
    flags = flags | BIT(0);
412
0
    msg->msg_endian = LITTLEEND;
413
0
#endif // if FASTDDS_IS_BIG_ENDIAN_TARGET
414
415
0
    DataMsgUtils::prepare_submessage_flags(change, topicKind, expectsInlineQos, inlineQos,
416
0
            dataFlag, keyFlag, inlineQosFlag, status);
417
418
0
    if (inlineQosFlag)
419
0
    {
420
0
        flags = flags | BIT(1);
421
0
    }
422
423
0
    if (keyFlag)
424
0
    {
425
0
        flags = flags | BIT(2);
426
0
    }
427
428
    // Submessage header.
429
0
    uint32_t submessage_size_pos = 0;
430
0
    uint16_t submessage_size = 0;
431
0
    uint32_t position_size_count_size = 0;
432
0
    bool added_no_error = DataMsgUtils::serialize_header(fragment_number, msg, change, readerId, flags,
433
0
                    submessage_size_pos, position_size_count_size);
434
435
    //Add INLINE QOS AND SERIALIZED PAYLOAD DEPENDING ON FLAGS:
436
0
    if (inlineQosFlag) //inlineQoS
437
0
    {
438
0
        DataMsgUtils::serialize_inline_qos(msg, change, topicKind, expectsInlineQos, inlineQos, status);
439
0
    }
440
441
    //Add Serialized Payload XXX TODO
442
0
    if (!keyFlag) // keyflag = 0 means that the serializedPayload SubmessageElement contains the serialized Data
443
0
    {
444
0
        if (copy_data)
445
0
        {
446
0
            added_no_error &= CDRMessage::addData(msg, payload.data, payload.length);
447
0
        }
448
0
        else if (msg->pos + payload.length > msg->max_size)
449
0
        {
450
0
            return false;
451
0
        }
452
0
        else
453
0
        {
454
0
            pending_buffer = NetworkBuffer(payload.data, payload.length);
455
0
            msg->pos += pending_buffer.size;
456
0
        }
457
0
    }
458
0
    else
459
0
    {
460
        // keyflag = 1 means that the serializedPayload SubmessageElement contains the serialized Key
461
        /*
462
            added_no_error &= CDRMessage::addOctet(&submsgElem, 0); //ENCAPSULATION
463
            if (submsgElem.msg_endian == BIGEND)
464
            added_no_error &= CDRMessage::addOctet(&submsgElem, PL_CDR_BE); //ENCAPSULATION
465
            else
466
            added_no_error &= CDRMessage::addOctet(&submsgElem, PL_CDR_LE); //ENCAPSULATION
467
468
            added_no_error &= CDRMessage::addUInt16(&submsgElem, 0); //ENCAPSULATION OPTIONS
469
            added_no_error &= Parameter_t::addParameterKey(&submsgElem, &change->instanceHandle);
470
            added_no_error &= Parameter_t::addParameterStatus(&submsgElem, status);
471
            added_no_error &= Parameter_t::addParameterSentinel(&submsgElem);
472
         */
473
0
        msg->msg_endian = old_endianess;
474
0
        return false;
475
0
    }
476
477
    // TODO(Ricardo) This should be on cachechange.
478
    // Align submessage to rtps alignment (4).
479
0
    submessage_size = uint16_t(msg->pos - position_size_count_size);
480
0
    for (; 0 != (submessage_size & 3); ++submessage_size)
481
0
    {
482
0
        if (copy_data)
483
0
        {
484
0
            added_no_error &= CDRMessage::addOctet(msg, 0);
485
0
        }
486
0
        else
487
0
        {
488
0
            ++pending_padding;
489
0
        }
490
0
    }
491
492
0
    if (!copy_data)
493
0
    {
494
0
        msg->pos -= pending_buffer.size;
495
0
    }
496
497
    //TODO(Ricardo) Improve.
498
0
    octet* o = reinterpret_cast<octet*>(&submessage_size);
499
0
    if (msg->msg_endian == DEFAULT_ENDIAN)
500
0
    {
501
0
        msg->buffer[submessage_size_pos] = *(o);
502
0
        msg->buffer[submessage_size_pos + 1] = *(o + 1);
503
0
    }
504
0
    else
505
0
    {
506
0
        msg->buffer[submessage_size_pos] = *(o + 1);
507
0
        msg->buffer[submessage_size_pos + 1] = *(o);
508
0
    }
509
510
0
    msg->msg_endian = old_endianess;
511
512
0
    return added_no_error;
513
0
}
514
515
} // namespace rtps
516
} // namespace fastdds
517
} // namespace eprosima