/src/pdns/pdns/dnsdistdist/channel.hh
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * This file is part of PowerDNS or dnsdist. |
3 | | * Copyright -- PowerDNS.COM B.V. and its contributors |
4 | | * |
5 | | * This program is free software; you can redistribute it and/or modify |
6 | | * it under the terms of version 2 of the GNU General Public License as |
7 | | * published by the Free Software Foundation. |
8 | | * |
9 | | * In addition, for the avoidance of any doubt, permission is granted to |
10 | | * link this program with OpenSSL and to (re)distribute the binaries |
11 | | * produced as the result of such linking. |
12 | | * |
13 | | * This program is distributed in the hope that it will be useful, |
14 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
15 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
16 | | * GNU General Public License for more details. |
17 | | * |
18 | | * You should have received a copy of the GNU General Public License |
19 | | * along with this program; if not, write to the Free Software |
20 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
21 | | */ |
22 | | #pragma once |
23 | | #include <memory> |
24 | | #include <optional> |
25 | | |
26 | | #include "misc.hh" |
27 | | |
28 | | /* g++ defines __SANITIZE_THREAD__ |
29 | | clang++ supports the nice __has_feature(thread_sanitizer), |
30 | | let's merge them */ |
31 | | #if defined(__has_feature) |
32 | | #if __has_feature(thread_sanitizer) |
33 | | #define __SANITIZE_THREAD__ 1 |
34 | | #endif |
35 | | #endif |
36 | | |
37 | | #if __SANITIZE_THREAD__ |
38 | | #if defined __has_include |
39 | | #if __has_include(<sanitizer/tsan_interface.h>) |
40 | | #include <sanitizer/tsan_interface.h> |
41 | | #else /* __has_include(<sanitizer/tsan_interface.h>) */ |
42 | | extern "C" void __tsan_acquire(void* addr); |
43 | | extern "C" void __tsan_release(void* addr); |
44 | | #endif /* __has_include(<sanitizer/tsan_interface.h>) */ |
45 | | #else /* defined __has_include */ |
46 | | extern "C" void __tsan_acquire(void* addr); |
47 | | extern "C" void __tsan_release(void* addr); |
48 | | #endif /* defined __has_include */ |
49 | | #endif /* __SANITIZE_THREAD__ */ |
50 | | |
51 | | namespace pdns |
52 | | { |
53 | | namespace channel |
54 | | { |
55 | | enum class SenderBlockingMode |
56 | | { |
57 | | SenderNonBlocking, |
58 | | SenderBlocking |
59 | | }; |
60 | | enum class ReceiverBlockingMode |
61 | | { |
62 | | ReceiverNonBlocking, |
63 | | ReceiverBlocking |
64 | | }; |
65 | | |
66 | | /** |
67 | | * The sender's end of a channel used to pass objects between threads. |
68 | | * |
69 | | * A sender can be used by several threads in a safe way. |
70 | | */ |
71 | | template <typename T, typename D = std::default_delete<T>> |
72 | | class Sender |
73 | | { |
74 | | public: |
75 | | Sender() = default; |
76 | | Sender(FDWrapper&& descriptor) : |
77 | | d_fd(std::move(descriptor)) |
78 | | { |
79 | | } |
80 | | Sender(const Sender&) = delete; |
81 | | Sender& operator=(const Sender&) = delete; |
82 | | Sender(Sender&&) = default; |
83 | | Sender& operator=(Sender&&) = default; |
84 | | ~Sender() = default; |
85 | | /** |
86 | | * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode. |
87 | | * |
88 | | * \return True if the object was properly sent, False if the channel is full. |
89 | | * |
90 | | * \throw runtime_error if the channel is broken, for example if the other end has been closed. |
91 | | */ |
92 | | bool send(std::unique_ptr<T, D>&&) const; |
93 | | void close(); |
94 | | |
95 | | private: |
96 | | FDWrapper d_fd; |
97 | | }; |
98 | | |
99 | | /** |
100 | | * The receiver's end of a channel used to pass objects between threads. |
101 | | * |
102 | | * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen. |
103 | | */ |
104 | | template <typename T, typename D = std::default_delete<T>> |
105 | | class Receiver |
106 | | { |
107 | | public: |
108 | | Receiver() = default; |
109 | | Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) : |
110 | | d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF) |
111 | | { |
112 | | } |
113 | | Receiver(const Receiver&) = delete; |
114 | | Receiver& operator=(const Receiver&) = delete; |
115 | | Receiver(Receiver&&) = default; |
116 | | Receiver& operator=(Receiver&&) = default; |
117 | | ~Receiver() = default; |
118 | | /** |
119 | | * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode. |
120 | | * |
121 | | * \return An object if one was available, and std::nullopt otherwise. |
122 | | * |
123 | | * \throw runtime_error if the channel is broken, for example if the other end has been closed. |
124 | | */ |
125 | | std::optional<std::unique_ptr<T, D>> receive(); |
126 | | std::optional<std::unique_ptr<T, D>> receive(D deleter); |
127 | | |
128 | | /** |
129 | | * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available. |
130 | | * |
131 | | * \return A valid descriptor or -1 if the Receiver was not properly initialized. |
132 | | */ |
133 | | int getDescriptor() const |
134 | | { |
135 | | return d_fd.getHandle(); |
136 | | } |
137 | | /** |
138 | | * \brief Whether the remote end has closed the channel. |
139 | | */ |
140 | | bool isClosed() const |
141 | | { |
142 | | return d_closed; |
143 | | } |
144 | | |
145 | | private: |
146 | | FDWrapper d_fd; |
147 | | bool d_closed{false}; |
148 | | bool d_throwOnEOF{true}; |
149 | | }; |
150 | | |
151 | | /** |
152 | | * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers. |
153 | | * |
154 | | * \return A pair of Sender and Receiver objects. |
155 | | * |
156 | | * \throw runtime_error if the channel creation failed. |
157 | | */ |
158 | | template <typename T, typename D = std::default_delete<T>> |
159 | | std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true); |
160 | | |
161 | | /** |
162 | | * The notifier's end of a channel used to communicate between threads. |
163 | | * |
164 | | * A notifier can be used by several threads in a safe way. |
165 | | */ |
166 | | class Notifier |
167 | | { |
168 | | public: |
169 | | Notifier() = default; |
170 | | Notifier(FDWrapper&&); |
171 | | Notifier(const Notifier&) = delete; |
172 | | Notifier& operator=(const Notifier&) = delete; |
173 | 0 | Notifier(Notifier&&) = default; |
174 | | Notifier& operator=(Notifier&&) = default; |
175 | 0 | ~Notifier() = default; |
176 | | |
177 | | /** |
178 | | * \brief Queue a notification to wake up the other end of the channel. |
179 | | * |
180 | | * \return True if the notification was properly sent, False if the channel is full. |
181 | | * |
182 | | * \throw runtime_error if the channel is broken, for example if the other end has been closed. |
183 | | */ |
184 | | bool notify() const; |
185 | | |
186 | | private: |
187 | | FDWrapper d_fd; |
188 | | }; |
189 | | |
190 | | /** |
191 | | * The waiter's end of a channel used to communicate between threads. |
192 | | * |
193 | | * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen. |
194 | | */ |
195 | | class Waiter |
196 | | { |
197 | | public: |
198 | | Waiter() = default; |
199 | | Waiter(FDWrapper&&, bool throwOnEOF = true); |
200 | | Waiter(const Waiter&) = delete; |
201 | | Waiter& operator=(const Waiter&) = delete; |
202 | 0 | Waiter(Waiter&&) = default; |
203 | | Waiter& operator=(Waiter&&) = default; |
204 | 0 | ~Waiter() = default; |
205 | | |
206 | | /** |
207 | | * \brief Clear all notifications queued on that channel, if any. |
208 | | */ |
209 | | void clear(); |
210 | | /** |
211 | | * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive. |
212 | | * |
213 | | * \return A valid descriptor or -1 if the Waiter was not properly initialized. |
214 | | */ |
215 | | int getDescriptor() const; |
216 | | /** |
217 | | * \brief Whether the remote end has closed the channel. |
218 | | */ |
219 | | bool isClosed() const |
220 | 0 | { |
221 | 0 | return d_closed; |
222 | 0 | } |
223 | | |
224 | | private: |
225 | | FDWrapper d_fd; |
226 | | bool d_closed{false}; |
227 | | bool d_throwOnEOF{true}; |
228 | | }; |
229 | | |
230 | | /** |
231 | | * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers. |
232 | | * |
233 | | * \return A pair of Notifier and Sender objects. |
234 | | * |
235 | | * \throw runtime_error if the channel creation failed. |
236 | | */ |
237 | | std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true); |
238 | | |
239 | | template <typename T, typename D> |
240 | | bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const |
241 | | { |
242 | | /* we cannot touch the initial unique pointer after writing to the pipe, |
243 | | not even to release it, so let's transfer it to a local object */ |
244 | | auto localObj = std::move(object); |
245 | | auto ptr = localObj.get(); |
246 | | static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail"); |
247 | | while (true) { |
248 | | #if __SANITIZE_THREAD__ |
249 | | __tsan_release(ptr); |
250 | | #endif /* __SANITIZE_THREAD__ */ |
251 | | ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr)); |
252 | | |
253 | | if (sent == sizeof(ptr)) { |
254 | | // coverity[leaked_storage] |
255 | | localObj.release(); |
256 | | return true; |
257 | | } |
258 | | else if (sent == 0) { |
259 | | #if __SANITIZE_THREAD__ |
260 | | __tsan_acquire(ptr); |
261 | | #endif /* __SANITIZE_THREAD__ */ |
262 | | throw std::runtime_error("Unable to write to channel: remote end has been closed"); |
263 | | } |
264 | | else { |
265 | | #if __SANITIZE_THREAD__ |
266 | | __tsan_acquire(ptr); |
267 | | #endif /* __SANITIZE_THREAD__ */ |
268 | | if (errno == EINTR) { |
269 | | continue; |
270 | | } |
271 | | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
272 | | object = std::move(localObj); |
273 | | return false; |
274 | | } |
275 | | else { |
276 | | throw std::runtime_error("Unable to write to channel:" + stringerror()); |
277 | | } |
278 | | } |
279 | | } |
280 | | } |
281 | | |
282 | | template <typename T, typename D> |
283 | | void Sender<T, D>::close() |
284 | | { |
285 | | d_fd.reset(); |
286 | | } |
287 | | |
288 | | template <typename T, typename D> |
289 | | std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive() |
290 | | { |
291 | | return receive(D()); |
292 | | } |
293 | | |
294 | | template <typename T, typename D> |
295 | | std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter) |
296 | | { |
297 | | while (true) { |
298 | | std::optional<std::unique_ptr<T, D>> result; |
299 | | T* objPtr{nullptr}; |
300 | | ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr)); |
301 | | if (got == sizeof(objPtr)) { |
302 | | #if __SANITIZE_THREAD__ |
303 | | __tsan_acquire(objPtr); |
304 | | #endif /* __SANITIZE_THREAD__ */ |
305 | | return std::unique_ptr<T, D>(objPtr, deleter); |
306 | | } |
307 | | else if (got == 0) { |
308 | | d_closed = true; |
309 | | if (!d_throwOnEOF) { |
310 | | return result; |
311 | | } |
312 | | throw std::runtime_error("EOF while reading from Channel receiver"); |
313 | | } |
314 | | else if (got == -1) { |
315 | | if (errno == EINTR) { |
316 | | continue; |
317 | | } |
318 | | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
319 | | return result; |
320 | | } |
321 | | throw std::runtime_error("Error while reading from Channel receiver: " + stringerror()); |
322 | | } |
323 | | else { |
324 | | throw std::runtime_error("Partial read from Channel receiver"); |
325 | | } |
326 | | } |
327 | | } |
328 | | |
329 | | template <typename T, typename D> |
330 | | std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF) |
331 | | { |
332 | | int fds[2] = {-1, -1}; |
333 | | if (pipe(fds) < 0) { |
334 | | throw std::runtime_error("Error creating channel pipe: " + stringerror()); |
335 | | } |
336 | | |
337 | | FDWrapper sender(fds[1]); |
338 | | FDWrapper receiver(fds[0]); |
339 | | if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) { |
340 | | int err = errno; |
341 | | throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); |
342 | | } |
343 | | |
344 | | if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) { |
345 | | int err = errno; |
346 | | throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err)); |
347 | | } |
348 | | |
349 | | if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) { |
350 | | setPipeBufferSize(receiver.getHandle(), pipeBufferSize); |
351 | | } |
352 | | |
353 | | return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)}; |
354 | | } |
355 | | } |
356 | | } |