Coverage Report

Created: 2025-08-28 07:00

/src/pdns/pdns/dnsdistdist/channel.hh
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22
#pragma once
23
#include <memory>
24
#include <optional>
25
26
#include "misc.hh"
27
28
/* g++ defines __SANITIZE_THREAD__
29
   clang++ supports the nice __has_feature(thread_sanitizer),
30
   let's merge them */
31
#if defined(__has_feature)
32
#if __has_feature(thread_sanitizer)
33
#define __SANITIZE_THREAD__ 1
34
#endif
35
#endif
36
37
#if __SANITIZE_THREAD__
38
#if defined __has_include
39
#if __has_include(<sanitizer/tsan_interface.h>)
40
#include <sanitizer/tsan_interface.h>
41
#else /* __has_include(<sanitizer/tsan_interface.h>) */
42
extern "C" void __tsan_acquire(void* addr);
43
extern "C" void __tsan_release(void* addr);
44
#endif /* __has_include(<sanitizer/tsan_interface.h>) */
45
#else /* defined __has_include */
46
extern "C" void __tsan_acquire(void* addr);
47
extern "C" void __tsan_release(void* addr);
48
#endif /* defined __has_include */
49
#endif /* __SANITIZE_THREAD__ */
50
51
namespace pdns
52
{
53
namespace channel
54
{
55
  enum class SenderBlockingMode
56
  {
57
    SenderNonBlocking,
58
    SenderBlocking
59
  };
60
  enum class ReceiverBlockingMode
61
  {
62
    ReceiverNonBlocking,
63
    ReceiverBlocking
64
  };
65
66
  /**
67
   * The sender's end of a channel used to pass objects between threads.
68
   *
69
   * A sender can be used by several threads in a safe way.
70
   */
71
  template <typename T, typename D = std::default_delete<T>>
72
  class Sender
73
  {
74
  public:
75
    Sender() = default;
76
    Sender(FDWrapper&& descriptor) :
77
      d_fd(std::move(descriptor))
78
    {
79
    }
80
    Sender(const Sender&) = delete;
81
    Sender& operator=(const Sender&) = delete;
82
    Sender(Sender&&) = default;
83
    Sender& operator=(Sender&&) = default;
84
    ~Sender() = default;
85
    /**
86
     * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
87
     *
88
     * \return True if the object was properly sent, False if the channel is full.
89
     *
90
     * \throw runtime_error if the channel is broken, for example if the other end has been closed.
91
     */
92
    bool send(std::unique_ptr<T, D>&&) const;
93
    void close();
94
95
  private:
96
    FDWrapper d_fd;
97
  };
98
99
  /**
100
   * The receiver's end of a channel used to pass objects between threads.
101
   *
102
   * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen.
103
   */
104
  template <typename T, typename D = std::default_delete<T>>
105
  class Receiver
106
  {
107
  public:
108
    Receiver() = default;
109
    Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) :
110
      d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF)
111
    {
112
    }
113
    Receiver(const Receiver&) = delete;
114
    Receiver& operator=(const Receiver&) = delete;
115
    Receiver(Receiver&&) = default;
116
    Receiver& operator=(Receiver&&) = default;
117
    ~Receiver() = default;
118
    /**
119
     * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
120
     *
121
     * \return An object if one was available, and std::nullopt otherwise.
122
     *
123
     * \throw runtime_error if the channel is broken, for example if the other end has been closed.
124
     */
125
    std::optional<std::unique_ptr<T, D>> receive();
126
    std::optional<std::unique_ptr<T, D>> receive(D deleter);
127
128
    /**
129
     * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
130
     *
131
     * \return A valid descriptor or -1 if the Receiver was not properly initialized.
132
     */
133
    int getDescriptor() const
134
    {
135
      return d_fd.getHandle();
136
    }
137
    /**
138
     * \brief Whether the remote end has closed the channel.
139
     */
140
    bool isClosed() const
141
    {
142
      return d_closed;
143
    }
144
145
  private:
146
    FDWrapper d_fd;
147
    bool d_closed{false};
148
    bool d_throwOnEOF{true};
149
  };
150
151
  /**
152
   * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers.
153
   *
154
   * \return A pair of Sender and Receiver objects.
155
   *
156
   * \throw runtime_error if the channel creation failed.
157
   */
158
  template <typename T, typename D = std::default_delete<T>>
159
  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true);
160
161
  /**
162
   * The notifier's end of a channel used to communicate between threads.
163
   *
164
   * A notifier can be used by several threads in a safe way.
165
   */
166
  class Notifier
167
  {
168
  public:
169
    Notifier() = default;
170
    Notifier(FDWrapper&&);
171
    Notifier(const Notifier&) = delete;
172
    Notifier& operator=(const Notifier&) = delete;
173
0
    Notifier(Notifier&&) = default;
174
    Notifier& operator=(Notifier&&) = default;
175
0
    ~Notifier() = default;
176
177
    /**
178
     * \brief Queue a notification to wake up the other end of the channel.
179
     *
180
     * \return True if the notification was properly sent, False if the channel is full.
181
     *
182
     * \throw runtime_error if the channel is broken, for example if the other end has been closed.
183
     */
184
    bool notify() const;
185
186
  private:
187
    FDWrapper d_fd;
188
  };
189
190
  /**
191
   * The waiter's end of a channel used to communicate between threads.
192
   *
193
   * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen.
194
   */
195
  class Waiter
196
  {
197
  public:
198
    Waiter() = default;
199
    Waiter(FDWrapper&&, bool throwOnEOF = true);
200
    Waiter(const Waiter&) = delete;
201
    Waiter& operator=(const Waiter&) = delete;
202
0
    Waiter(Waiter&&) = default;
203
    Waiter& operator=(Waiter&&) = default;
204
0
    ~Waiter() = default;
205
206
    /**
207
     * \brief Clear all notifications queued on that channel, if any.
208
     */
209
    void clear();
210
    /**
211
     * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive.
212
     *
213
     * \return A valid descriptor or -1 if the Waiter was not properly initialized.
214
     */
215
    int getDescriptor() const;
216
    /**
217
     * \brief Whether the remote end has closed the channel.
218
     */
219
    bool isClosed() const
220
0
    {
221
0
      return d_closed;
222
0
    }
223
224
  private:
225
    FDWrapper d_fd;
226
    bool d_closed{false};
227
    bool d_throwOnEOF{true};
228
  };
229
230
  /**
231
   * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers.
232
   *
233
   * \return A pair of Notifier and Sender objects.
234
   *
235
   * \throw runtime_error if the channel creation failed.
236
   */
237
  std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
238
239
  template <typename T, typename D>
240
  bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
241
  {
242
    /* we cannot touch the initial unique pointer after writing to the pipe,
243
       not even to release it, so let's transfer it to a local object */
244
    auto localObj = std::move(object);
245
    auto ptr = localObj.get();
246
    static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
247
    while (true) {
248
#if __SANITIZE_THREAD__
249
      __tsan_release(ptr);
250
#endif /* __SANITIZE_THREAD__ */
251
      ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
252
253
      if (sent == sizeof(ptr)) {
254
        // coverity[leaked_storage]
255
        localObj.release();
256
        return true;
257
      }
258
      else if (sent == 0) {
259
#if __SANITIZE_THREAD__
260
        __tsan_acquire(ptr);
261
#endif /* __SANITIZE_THREAD__ */
262
        throw std::runtime_error("Unable to write to channel: remote end has been closed");
263
      }
264
      else {
265
#if __SANITIZE_THREAD__
266
        __tsan_acquire(ptr);
267
#endif /* __SANITIZE_THREAD__ */
268
        if (errno == EINTR) {
269
          continue;
270
        }
271
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
272
          object = std::move(localObj);
273
          return false;
274
        }
275
        else {
276
          throw std::runtime_error("Unable to write to channel:" + stringerror());
277
        }
278
      }
279
    }
280
  }
281
282
  template <typename T, typename D>
283
  void Sender<T, D>::close()
284
  {
285
    d_fd.reset();
286
  }
287
288
  template <typename T, typename D>
289
  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
290
  {
291
    return receive(D());
292
  }
293
294
  template <typename T, typename D>
295
  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
296
  {
297
    while (true) {
298
      std::optional<std::unique_ptr<T, D>> result;
299
      T* objPtr{nullptr};
300
      ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr));
301
      if (got == sizeof(objPtr)) {
302
#if __SANITIZE_THREAD__
303
        __tsan_acquire(objPtr);
304
#endif /* __SANITIZE_THREAD__ */
305
        return std::unique_ptr<T, D>(objPtr, deleter);
306
      }
307
      else if (got == 0) {
308
        d_closed = true;
309
        if (!d_throwOnEOF) {
310
          return result;
311
        }
312
        throw std::runtime_error("EOF while reading from Channel receiver");
313
      }
314
      else if (got == -1) {
315
        if (errno == EINTR) {
316
          continue;
317
        }
318
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
319
          return result;
320
        }
321
        throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
322
      }
323
      else {
324
        throw std::runtime_error("Partial read from Channel receiver");
325
      }
326
    }
327
  }
328
329
  template <typename T, typename D>
330
  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF)
331
  {
332
    int fds[2] = {-1, -1};
333
    if (pipe(fds) < 0) {
334
      throw std::runtime_error("Error creating channel pipe: " + stringerror());
335
    }
336
337
    FDWrapper sender(fds[1]);
338
    FDWrapper receiver(fds[0]);
339
    if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) {
340
      int err = errno;
341
      throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
342
    }
343
344
    if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) {
345
      int err = errno;
346
      throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
347
    }
348
349
    if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) {
350
      setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
351
    }
352
353
    return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)};
354
  }
355
}
356
}