Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/writer/ChangeForReader.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ChangeForReader.h
17
 */
18
19
#ifndef _FASTDDS_RTPS_CHANGEFORREADER_H_
20
#define _FASTDDS_RTPS_CHANGEFORREADER_H_
21
22
#include <fastdds/rtps/common/CacheChange.h>
23
#include <fastdds/rtps/common/FragmentNumber.h>
24
#include <fastdds/rtps/common/SequenceNumber.h>
25
26
#include <cassert>
27
28
namespace eprosima {
29
namespace fastrtps {
30
namespace rtps {
31
32
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
33
34
/**
35
 * Enum ChangeForReaderStatus_t, possible states for a CacheChange_t in a ReaderProxy.
36
 *  @ingroup COMMON_MODULE
37
 */
38
enum ChangeForReaderStatus_t
39
{
40
    UNSENT = 0,                    //!< UNSENT
41
    REQUESTED = 1,                 //!< REQUESTED
42
    UNACKNOWLEDGED = 2,            //!< UNACKNOWLEDGED
43
    ACKNOWLEDGED = 3,              //!< ACKNOWLEDGED
44
    UNDERWAY = 4                   //!< UNDERWAY
45
};
46
47
/**
48
 * Struct ChangeForReader_t used to represent the state of a specific change with respect to a specific reader, as well as its relevance.
49
 *  @ingroup COMMON_MODULE
50
 */
51
class ChangeForReader_t
52
{
53
    friend struct ChangeForReaderCmp;
54
55
public:
56
57
    explicit ChangeForReader_t(
58
            CacheChange_t* change)
59
        : status_(UNSENT)
60
        , seq_num_(change->sequenceNumber)
61
        , change_(change)
62
0
    {
63
0
        if (change->getFragmentSize() != 0)
64
0
        {
65
0
            unsent_fragments_.base(1u);
66
0
            unsent_fragments_.add_range(1u, change->getFragmentCount() + 1u);
67
0
        }
68
0
    }
69
70
    /**
71
     * Get the cache change
72
     * @return Cache change
73
     */
74
    CacheChange_t* getChange() const
75
0
    {
76
0
        return change_;
77
0
    }
78
79
    void setStatus(
80
            const ChangeForReaderStatus_t status)
81
0
    {
82
0
        status_ = status;
83
0
    }
84
85
    ChangeForReaderStatus_t getStatus() const
86
0
    {
87
0
        return status_;
88
0
    }
89
90
    const SequenceNumber_t getSequenceNumber() const
91
0
    {
92
0
        return seq_num_;
93
0
    }
94
95
    FragmentNumber_t get_next_unsent_fragment() const
96
0
    {
97
0
        if (unsent_fragments_.empty())
98
0
        {
99
0
            return change_->getFragmentCount() + 1;
100
0
        }
101
0
102
0
        return unsent_fragments_.min();
103
0
    }
104
105
    FragmentNumberSet_t getUnsentFragments() const
106
0
    {
107
0
        return unsent_fragments_;
108
0
    }
109
110
    void markAllFragmentsAsUnsent()
111
0
    {
112
0
        assert(nullptr != change_);
113
0
114
0
        if (change_->getFragmentSize() != 0)
115
0
        {
116
0
            unsent_fragments_.base(1u);
117
0
            unsent_fragments_.add_range(1u, change_->getFragmentCount() + 1u);
118
0
        }
119
0
    }
120
121
    void markFragmentsAsSent(
122
            const FragmentNumber_t& sentFragment)
123
0
    {
124
0
        unsent_fragments_.remove(sentFragment);
125
0
126
0
        // We only use the running window mechanism during the first stage, until all fragments have been delivered
127
0
        // once, and we consider the whole change as delivered.
128
0
        if (!delivered_ && !unsent_fragments_.empty() && (unsent_fragments_.max() < change_->getFragmentCount()))
129
0
        {
130
0
            FragmentNumber_t base = unsent_fragments_.base();
131
0
            FragmentNumber_t max = unsent_fragments_.max();
132
0
            assert(!unsent_fragments_.is_set(base));
133
0
134
0
            // Update base to first bit set
135
0
            base = unsent_fragments_.min();
136
0
            unsent_fragments_.base_update(base);
137
0
138
0
            // Add all possible fragments
139
0
            unsent_fragments_.add_range(max + 1u, change_->getFragmentCount() + 1u);
140
0
        }
141
0
    }
142
143
    void markFragmentsAsUnsent(
144
            const FragmentNumberSet_t& unsentFragments)
145
0
    {
146
0
        // Ignore NACK_FRAG messages during the first stage, until all fragments have been delivered once, and we
147
0
        // consider the whole change as delivered.
148
0
        if (delivered_)
149
0
        {
150
0
            if (unsent_fragments_.empty())
151
0
            {
152
0
                // Current window is empty, so we can set it to the received one.
153
0
                unsent_fragments_ = unsentFragments;
154
0
            }
155
0
            else
156
0
            {
157
0
                // Update window to send the lowest possible requested fragments first.
158
0
                FragmentNumber_t other_base = unsentFragments.base();
159
0
                if (other_base < unsent_fragments_.base())
160
0
                {
161
0
                    unsent_fragments_.base_update(other_base);
162
0
                }
163
0
                unsentFragments.for_each(
164
0
                    [this](
165
0
                        FragmentNumber_t element)
166
0
                    {
167
0
                        unsent_fragments_.add(element);
168
0
                    });
169
0
            }
170
0
        }
171
0
    }
172
173
    bool has_been_delivered() const
174
0
    {
175
0
        return delivered_;
176
0
    }
177
178
    void set_delivered()
179
0
    {
180
0
        delivered_ = true;
181
0
    }
182
183
private:
184
185
    //!Status
186
    ChangeForReaderStatus_t status_;
187
188
    //!Sequence number
189
    SequenceNumber_t seq_num_;
190
191
    CacheChange_t* change_;
192
193
    FragmentNumberSet_t unsent_fragments_;
194
195
    //! Indicates if was delivered at least once.
196
    bool delivered_ = false;
197
};
198
199
struct ChangeForReaderCmp
200
{
201
    bool operator ()(
202
            const ChangeForReader_t& a,
203
            const ChangeForReader_t& b) const
204
0
    {
205
0
        return a.seq_num_ < b.seq_num_;
206
0
    }
207
208
};
209
210
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
211
212
} // namespace rtps
213
} // namespace fastrtps
214
} // namespace eprosima
215
216
#endif /* _FASTDDS_RTPS_CHANGEFORREADER_H_ */