Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/rtps/DataSharing/DataSharingNotification.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file DataSharingNotification.hpp
17
 */
18
19
#ifndef RTPS_HISTORY_DATASHARINGNOTIFICATION_HPP
20
#define RTPS_HISTORY_DATASHARINGNOTIFICATION_HPP
21
22
#include <fastdds/dds/log/Log.hpp>
23
#include <rtps/history/PoolConfig.h>
24
#include <utils/shared_memory/SharedMemSegment.hpp>
25
#include <utils/shared_memory/SharedDir.hpp>
26
#include <fastdds/rtps/common/Guid.hpp>
27
28
#include <memory>
29
#include <vector>
30
#include <mutex>
31
#include <atomic>
32
33
namespace eprosima {
34
namespace fastdds {
35
namespace rtps {
36
37
class DataSharingNotification
38
{
39
40
    friend class DataSharingListener;
41
    friend class DataSharingNotifier;
42
43
public:
44
45
    typedef fastdds::rtps::SharedSegmentBase Segment;
46
47
0
    DataSharingNotification() = default;
48
49
0
    virtual ~DataSharingNotification() = default;
50
51
    /**
52
     * Notifies of new data
53
     */
54
    inline void notify()
55
0
    {
56
0
        try
57
0
        {
58
0
            std::unique_lock<Segment::mutex> lock(notification_->notification_mutex);
59
0
            notification_->new_data.store(true);
60
0
            lock.unlock();
61
0
            notification_->notification_cv.notify_all();
62
0
        }
63
0
        catch (const boost::interprocess::interprocess_exception& /*e*/)
64
0
        {
65
            // Timeout when locking
66
0
        }
67
0
    }
68
69
    /**
70
     * Returns the GUID of the reader listening to the notifications
71
     */
72
    inline const GUID_t&  reader() const
73
0
    {
74
0
        return segment_id_;
75
0
    }
76
77
    static std::shared_ptr<DataSharingNotification> create_notification(
78
            const GUID_t& reader_guid,
79
            const std::string& shared_dir = std::string());
80
81
    static std::shared_ptr<DataSharingNotification> open_notification(
82
            const GUID_t& reader_guid,
83
            const std::string& shared_dir = std::string());
84
85
    void destroy();
86
87
    static std::string get_default_directory()
88
0
    {
89
0
        std::string dir;
90
0
        fastdds::rtps::SharedDir::get_default_shared_dir(dir);
91
0
        return dir;
92
0
    }
93
94
    constexpr static const char* domain_name()
95
0
    {
96
0
        return "fast_datasharing";
97
0
    }
98
99
protected:
100
101
#pragma warning(push)
102
#pragma warning(disable:4324)
103
    struct alignas (8) Notification
104
    {
105
        //! CV to wait for new notifications
106
        Segment::condition_variable notification_cv;
107
108
        //! synchronization mutex
109
        Segment::mutex notification_mutex;
110
111
        //! New data available
112
        std::atomic<bool> new_data;
113
    };
114
#pragma warning(pop)
115
116
    static std::string generate_segment_name(
117
            const std::string& shared_dir,
118
            const GUID_t& reader_guid)
119
0
    {
120
0
        std::stringstream ss;
121
0
        if (!shared_dir.empty())
122
0
        {
123
0
            ss << shared_dir << "/";
124
0
        }
125
0
        ss << DataSharingNotification::domain_name() << "_" << reader_guid.guidPrefix << "_" << reader_guid.entityId;
126
0
        return ss.str();
127
0
    }
128
129
    bool create_and_init_notification(
130
            const GUID_t& reader_guid,
131
            const std::string& shared_dir = std::string());
132
133
    bool open_and_init_notification(
134
            const GUID_t& reader_guid,
135
            const std::string& shared_dir = std::string());
136
137
    template <typename T>
138
    bool create_and_init_shared_segment_notification(
139
            const GUID_t& reader_guid,
140
            const std::string& shared_dir)
141
0
    {
142
0
        segment_id_ = reader_guid;
143
0
        segment_name_ = generate_segment_name(shared_dir, reader_guid);
144
0
        std::unique_ptr<T> local_segment;
145
146
0
        try
147
0
        {
148
0
            uint32_t per_allocation_extra_size = T::compute_per_allocation_extra_size(
149
0
                alignof(Notification), DataSharingNotification::domain_name());
150
0
            uint32_t segment_size = static_cast<uint32_t>(sizeof(Notification)) + per_allocation_extra_size;
151
152
            //Open the segment
153
0
            T::remove(segment_name_);
154
155
0
            local_segment.reset(
156
0
                new T(boost::interprocess::create_only,
157
0
                segment_name_,
158
0
                segment_size + T::EXTRA_SEGMENT_SIZE));
159
0
        }
160
0
        catch (const std::exception& e)
161
0
        {
162
0
            EPROSIMA_LOG_ERROR(HISTORY_DATASHARING_LISTENER, "Failed to create segment " << segment_name_
163
0
                                                                                         << ": " << e.what());
164
0
            return false;
165
0
        }
166
167
0
        try
168
0
        {
169
            // Alloc and initialize the Node
170
0
            notification_ = local_segment->get().template construct<Notification>("notification_node")();
171
0
            notification_->new_data.store(false);
172
0
        }
173
0
        catch (std::exception& e)
174
0
        {
175
0
            T::remove(segment_name_);
176
177
0
            EPROSIMA_LOG_ERROR(HISTORY_DATASHARING_LISTENER, "Failed to create listener queue " << segment_name_
178
0
                                                                                                << ": " << e.what());
179
0
            return false;
180
0
        }
181
182
0
        segment_ = std::move(local_segment);
183
0
        owned_ = true;
184
0
        return true;
185
0
    }
Unexecuted instantiation: bool eprosima::fastdds::rtps::DataSharingNotification::create_and_init_shared_segment_notification<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> >(eprosima::fastdds::rtps::GUID_t const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
Unexecuted instantiation: bool eprosima::fastdds::rtps::DataSharingNotification::create_and_init_shared_segment_notification<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_mapped_file<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::file_mapping> >(eprosima::fastdds::rtps::GUID_t const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
186
187
    template <typename T>
188
    bool open_and_init_shared_segment_notification(
189
            const GUID_t& reader_guid,
190
            const std::string& shared_dir)
191
0
    {
192
0
        segment_id_ = reader_guid;
193
0
        segment_name_ = generate_segment_name(shared_dir, reader_guid);
194
195
        //Open the segment
196
0
        std::unique_ptr<T> local_segment;
197
0
        try
198
0
        {
199
0
            local_segment = std::unique_ptr<T>(
200
0
                new T(boost::interprocess::open_only,
201
0
                segment_name_.c_str()));
202
0
        }
203
0
        catch (const std::exception& e)
204
0
        {
205
0
            EPROSIMA_LOG_ERROR(HISTORY_DATASHARING_LISTENER, "Failed to open segment " << segment_name_
206
0
                                                                                       << ": " << e.what());
207
0
            return false;
208
0
        }
209
210
        // Initialize values from the segment
211
0
        notification_ = (local_segment->get().template find<Notification>(
212
0
                    "notification_node")).first;
213
0
        if (!notification_)
214
0
        {
215
0
            local_segment.reset();
216
217
0
            EPROSIMA_LOG_ERROR(HISTORY_DATASHARING_LISTENER, "Failed to open listener queue " << segment_name_);
218
0
            return false;
219
0
        }
220
221
0
        segment_ = std::move(local_segment);
222
0
        return true;
223
0
    }
Unexecuted instantiation: bool eprosima::fastdds::rtps::DataSharingNotification::open_and_init_shared_segment_notification<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> >(eprosima::fastdds::rtps::GUID_t const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
Unexecuted instantiation: bool eprosima::fastdds::rtps::DataSharingNotification::open_and_init_shared_segment_notification<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_mapped_file<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::file_mapping> >(eprosima::fastdds::rtps::GUID_t const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
224
225
    GUID_t segment_id_;         //< The ID of the segment is the GUID of the reader
226
    std::string segment_name_;  //< Segment name
227
228
    std::unique_ptr<Segment> segment_;  //< Shared memory segment
229
    Notification* notification_;        //< The notification data
230
    bool owned_ = false;                //< Whether the shared segment is owned by this instance
231
};
232
233
}  // namespace rtps
234
}  // namespace fastdds
235
}  // namespace eprosima
236
237
#endif  // RTPS_DATASHARING_DATASHARINGNOTIFICATION_HPP