Coverage Report

Created: 2025-06-13 06:46

/src/Fast-DDS/src/cpp/rtps/writer/ChangeForReader.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ChangeForReader.hpp
17
 */
18
19
#ifndef RTPS_WRITER__CHANGEFORREADER_HPP
20
#define RTPS_WRITER__CHANGEFORREADER_HPP
21
22
#include <fastdds/rtps/common/CacheChange.hpp>
23
#include <fastdds/rtps/common/FragmentNumber.hpp>
24
#include <fastdds/rtps/common/SequenceNumber.hpp>
25
26
#include <cassert>
27
28
namespace eprosima {
29
namespace fastdds {
30
namespace rtps {
31
32
/**
33
 * Enum ChangeForReaderStatus_t, possible states for a CacheChange_t in a ReaderProxy.
34
 *  @ingroup COMMON_MODULE
35
 */
36
enum ChangeForReaderStatus_t
37
{
38
    UNSENT = 0,                    //!< UNSENT
39
    REQUESTED = 1,                 //!< REQUESTED
40
    UNACKNOWLEDGED = 2,            //!< UNACKNOWLEDGED
41
    ACKNOWLEDGED = 3,              //!< ACKNOWLEDGED
42
    UNDERWAY = 4                   //!< UNDERWAY
43
};
44
45
/**
46
 * Struct ChangeForReader_t used to represent the state of a specific change with respect to a specific reader, as well as its relevance.
47
 *  @ingroup COMMON_MODULE
48
 */
49
class ChangeForReader_t
50
{
51
    friend struct ChangeForReaderCmp;
52
53
public:
54
55
    explicit ChangeForReader_t(
56
            CacheChange_t* change)
57
0
        : status_(UNSENT)
58
0
        , seq_num_(change->sequenceNumber)
59
0
        , change_(change)
60
0
    {
61
0
        if (change->getFragmentSize() != 0)
62
0
        {
63
0
            unsent_fragments_.base(1u);
64
0
            unsent_fragments_.add_range(1u, change->getFragmentCount() + 1u);
65
0
        }
66
0
    }
67
68
    /**
69
     * Get the cache change
70
     * @return Cache change
71
     */
72
    CacheChange_t* getChange() const
73
0
    {
74
0
        return change_;
75
0
    }
76
77
    void setStatus(
78
            const ChangeForReaderStatus_t status)
79
0
    {
80
0
        status_ = status;
81
0
    }
82
83
    ChangeForReaderStatus_t getStatus() const
84
0
    {
85
0
        return status_;
86
0
    }
87
88
    const SequenceNumber_t getSequenceNumber() const
89
0
    {
90
0
        return seq_num_;
91
0
    }
92
93
    FragmentNumber_t get_next_unsent_fragment() const
94
0
    {
95
0
        if (unsent_fragments_.empty())
96
0
        {
97
0
            return change_->getFragmentCount() + 1;
98
0
        }
99
100
0
        return unsent_fragments_.min();
101
0
    }
102
103
    FragmentNumberSet_t getUnsentFragments() const
104
0
    {
105
0
        return unsent_fragments_;
106
0
    }
107
108
    void markAllFragmentsAsUnsent()
109
0
    {
110
0
        assert(nullptr != change_);
111
112
0
        if (change_->getFragmentSize() != 0)
113
0
        {
114
0
            unsent_fragments_.base(1u);
115
0
            unsent_fragments_.add_range(1u, change_->getFragmentCount() + 1u);
116
0
        }
117
0
    }
118
119
    void markFragmentsAsSent(
120
            const FragmentNumber_t& sentFragment)
121
0
    {
122
0
        unsent_fragments_.remove(sentFragment);
123
124
        // We only use the running window mechanism during the first stage, until all fragments have been delivered
125
        // once, and we consider the whole change as delivered.
126
0
        if (!delivered_ && !unsent_fragments_.empty() && (unsent_fragments_.max() < change_->getFragmentCount()))
127
0
        {
128
0
            FragmentNumber_t base = unsent_fragments_.base();
129
0
            FragmentNumber_t max = unsent_fragments_.max();
130
0
            assert(!unsent_fragments_.is_set(base));
131
132
            // Update base to first bit set
133
0
            base = unsent_fragments_.min();
134
0
            unsent_fragments_.base_update(base);
135
136
            // Add all possible fragments
137
0
            unsent_fragments_.add_range(max + 1u, change_->getFragmentCount() + 1u);
138
0
        }
139
0
    }
140
141
    void markFragmentsAsUnsent(
142
            const FragmentNumberSet_t& unsentFragments)
143
0
    {
144
        // Ignore NACK_FRAG messages during the first stage, until all fragments have been delivered once, and we
145
        // consider the whole change as delivered.
146
0
        if (delivered_)
147
0
        {
148
0
            if (unsent_fragments_.empty())
149
0
            {
150
                // Current window is empty, so we can set it to the received one.
151
0
                unsent_fragments_ = unsentFragments;
152
0
            }
153
0
            else
154
0
            {
155
                // Update window to send the lowest possible requested fragments first.
156
0
                FragmentNumber_t other_base = unsentFragments.base();
157
0
                if (other_base < unsent_fragments_.base())
158
0
                {
159
0
                    unsent_fragments_.base_update(other_base);
160
0
                }
161
0
                unsentFragments.for_each(
162
0
                    [this](
163
0
                        FragmentNumber_t element)
164
0
                    {
165
0
                        unsent_fragments_.add(element);
166
0
                    });
167
0
            }
168
0
        }
169
0
    }
170
171
    bool has_been_delivered() const
172
0
    {
173
0
        return delivered_;
174
0
    }
175
176
    void set_delivered()
177
0
    {
178
0
        delivered_ = true;
179
0
    }
180
181
private:
182
183
    //!Status
184
    ChangeForReaderStatus_t status_;
185
186
    //!Sequence number
187
    SequenceNumber_t seq_num_;
188
189
    CacheChange_t* change_;
190
191
    FragmentNumberSet_t unsent_fragments_;
192
193
    //! Indicates if was delivered at least once.
194
    bool delivered_ = false;
195
};
196
197
struct ChangeForReaderCmp
198
{
199
    bool operator ()(
200
            const ChangeForReader_t& a,
201
            const ChangeForReader_t& b) const
202
0
    {
203
0
        return a.seq_num_ < b.seq_num_;
204
0
    }
205
206
};
207
208
} // namespace rtps
209
} // namespace fastdds
210
} // namespace eprosima
211
212
#endif // RTPS_WRITER__CHANGEFORREADER_HPP