Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/dom/media/AsyncLogger.h
Line
Count
Source (jump to first uncovered line)
1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
 * License, v. 2.0. If a copy of the MPL was not distributed with this
3
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5
/* Implementation of an asynchronous lock-free logging system. */
6
7
#ifndef mozilla_dom_AsyncLogger_h
8
#define mozilla_dom_AsyncLogger_h
9
10
#include <atomic>
11
#include <thread>
12
#include "mozilla/Logging.h"
13
#include "mozilla/Attributes.h"
14
#include "mozilla/MathAlgorithms.h"
15
#include "mozilla/Sprintf.h"
16
17
namespace mozilla {
18
19
namespace detail {
20
21
 // This class implements a lock-free multiple producer single consumer queue of
22
 // fixed size log messages, with the following characteristics:
23
 // - Unbounded (uses a intrinsic linked list)
24
 // - Allocates on Push. Push can be called on any thread.
25
 // - Deallocates on Pop. Pop MUST always be called on the same thread for the
26
 // life-time of the queue.
27
 //
28
 // In our scenario, the producer threads are real-time, they can't block. The
29
 // consummer thread runs every now and then and empties the queue to a log
30
 // file, on disk.
31
 //
32
 // Having fixed size messages and jemalloc is probably not the fastest, but
33
 // allows having a simpler design, we count on the fact that jemalloc will get
34
 // the memory from a thread-local source most of the time.
35
template<size_t MESSAGE_LENGTH>
36
class MPSCQueue
37
{
38
public:
39
    struct Message {
40
        Message()
41
3
        {
42
3
           mNext.store(nullptr, std::memory_order_relaxed);
43
3
        }
44
        Message(const Message& aMessage) = delete;
45
        void operator=(const Message& aMessage) = delete;
46
47
        char data[MESSAGE_LENGTH];
48
        std::atomic<Message*> mNext;
49
    };
50
    // Creates a new MPSCQueue. Initially, the queue has a single sentinel node,
51
    // pointed to by both mHead and mTail.
52
    MPSCQueue()
53
    // At construction, the initial message points to nullptr (it has no
54
    // successor). It is a sentinel node, that does not contain meaningful
55
    // data.
56
    : mHead(new Message())
57
    , mTail(mHead.load(std::memory_order_relaxed))
58
3
    { }
59
60
    ~MPSCQueue()
61
0
    {
62
0
        Message dummy;
63
0
        while (this->Pop(dummy.data)) {}
64
0
        Message* front = mHead.load(std::memory_order_relaxed);
65
0
        delete front;
66
0
    }
67
68
    void
69
    Push(MPSCQueue<MESSAGE_LENGTH>::Message* aMessage)
70
0
    {
71
0
        // The next two non-commented line are called A and B in this paragraph.
72
0
        // Producer threads i, i-1, etc. are numbered in the order they reached
73
0
        // A in time, thread i being the thread that has reached A first.
74
0
        // Atomically, on line A the new `mHead` is set to be the node that was
75
0
        // just allocated, with strong memory order. From now one, any thread
76
0
        // that reaches A will see that the node just allocated is
77
0
        // effectively the head of the list, and will make itself the new head
78
0
        // of the list.
79
0
        // In a bad case (when thread i executes A and then
80
0
        // is not scheduled for a long time), it is possible that thread i-1 and
81
0
        // subsequent threads create a seemingly disconnected set of nodes, but
82
0
        // they all have the correct value for the next node to set as their
83
0
        // mNext member on their respective stacks (in `prev`), and this is
84
0
        // always correct. When the scheduler resumes, and line B is executed,
85
0
        // the correct linkage is resumed.
86
0
        // Before line B, since mNext for the node the was the last element of
87
0
        // the queue still has an mNext of nullptr, Pop will not see the node
88
0
        // added.
89
0
        // For line A, it's critical to have strong ordering both ways (since
90
0
        // it's going to possibly be read and write repeatidly by multiple
91
0
        // threads)
92
0
        // Line B can have weaker guarantees, it's only going to be written by a
93
0
        // single thread, and we just need to ensure it's read properly by a
94
0
        // single other one.
95
0
        Message* prev = mHead.exchange(aMessage, std::memory_order_acq_rel);
96
0
        prev->mNext.store(aMessage, std::memory_order_release);
97
0
    }
98
99
    // Allocates a new node, copy aInput to the new memory location, and pushes
100
    // it to the end of the list.
101
    void
102
    Push(const char aInput[MESSAGE_LENGTH])
103
    {
104
        // Create a new message, and copy the messages passed on argument to the
105
        // new memory location. We are not touching the queue right now. The
106
        // successor for this new node is set to be nullptr.
107
        Message* msg = new Message();
108
        strncpy(msg->data, aInput, MESSAGE_LENGTH);
109
110
        Push(msg);
111
    }
112
113
    // Copy the content of the first message of the queue to aOutput, and
114
    // frees the message. Returns true if there was a message, in which case
115
    // `aOutput` contains a valid value. If the queue was empty, returns false,
116
    // in which case `aOutput` is left untouched.
117
    bool
118
    Pop(char aOutput[MESSAGE_LENGTH])
119
0
    {
120
0
        // Similarly, in this paragraph, the two following lines are called A
121
0
        // and B, and threads are called thread i, i-1, etc. in order of
122
0
        // execution of line A.
123
0
        // On line A, the first element of the queue is acquired. It is simply a
124
0
        // sentinel node.
125
0
        // On line B, we acquire the node that has the data we want. If B is
126
0
        // null, then only the sentinel node was present in the queue, we can
127
0
        // safely return false.
128
0
        // mTail can be loaded with relaxed ordering, since it's not written nor
129
0
        // read by any other thread (this queue is single consumer).
130
0
        // mNext can be written to by one of the producer, so it's necessary to
131
0
        // ensure those writes are seen, hence the stricter ordering.
132
0
        Message* tail = mTail.load(std::memory_order_relaxed);
133
0
        Message* next = tail->mNext.load(std::memory_order_acquire);
134
0
135
0
        if (next == nullptr) {
136
0
            return false;
137
0
        }
138
0
139
0
        strncpy(aOutput, next->data, MESSAGE_LENGTH);
140
0
141
0
        // Simply shift the queue one node further, so that the sentinel node is
142
0
        // now pointing to the correct most ancient node. It contains stale data,
143
0
        // but this data will never be read again.
144
0
        // It's only necessary to ensure the previous load on this thread is not
145
0
        // reordered past this line, so release ordering is sufficient here.
146
0
        mTail.store(next, std::memory_order_release);
147
0
148
0
        // This thread is now the only thing that points to `tail`, it can be
149
0
        // safely deleted.
150
0
        delete tail;
151
0
152
0
        return true;
153
0
    }
154
155
private:
156
    // An atomic pointer to the most recent message in the queue.
157
    std::atomic<Message*> mHead;
158
    // An atomic pointer to a sentinel node, that points to the oldest message
159
    // in the queue.
160
    std::atomic<Message*> mTail;
161
162
    MPSCQueue(const MPSCQueue&) = delete;
163
    void operator=(const MPSCQueue&) = delete;
164
public:
165
    // The goal here is to make it easy on the allocator. We pack a pointer in the
166
    // message struct, and we still want to do power of two allocations to
167
    // minimize allocator slop. The allocation size are going to be constant, so
168
    // the allocation is probably going to hit the thread local cache in jemalloc,
169
    // making it cheap and, more importantly, lock-free enough.
170
    static const size_t MESSAGE_PADDING = sizeof(Message::mNext);
171
private:
172
    static_assert(IsPowerOfTwo(MESSAGE_LENGTH + MESSAGE_PADDING),
173
                  "MPSCQueue internal allocations must have a size that is a"
174
                  "power of two ");
175
};
176
} // end namespace detail
177
178
// This class implements a lock-free asynchronous logger, that outputs to
179
// MOZ_LOG.
180
// Any thread can use this logger without external synchronization and without
181
// being blocked. This log is suitable for use in real-time audio threads.
182
// Log formatting is best done externally, this class implements the output
183
// mechanism only.
184
// This class uses a thread internally, and must be started and stopped
185
// manually.
186
// If logging is disabled, all the calls are no-op.
187
class AsyncLogger
188
{
189
public:
190
  static const uint32_t MAX_MESSAGE_LENGTH = 512 - detail::MPSCQueue<sizeof(void*)>::MESSAGE_PADDING;
191
192
  // aLogModuleName is the name of the MOZ_LOG module.
193
  explicit AsyncLogger(const char* aLogModuleName)
194
  : mThread(nullptr)
195
  , mLogModule(aLogModuleName)
196
  , mRunning(false)
197
3
  { }
198
199
  ~AsyncLogger()
200
0
  {
201
0
    if (Enabled()) {
202
0
      Stop();
203
0
    }
204
0
  }
205
206
  void Start()
207
0
  {
208
0
    MOZ_ASSERT(!mRunning, "Double calls to AsyncLogger::Start");
209
0
    if (Enabled()) {
210
0
      mRunning = true;
211
0
      Run();
212
0
    }
213
0
  }
214
215
  void Stop()
216
0
  {
217
0
    if (Enabled()) {
218
0
      if (mRunning) {
219
0
        mRunning = false;
220
0
        mThread->join();
221
0
      }
222
0
    } else {
223
0
      MOZ_ASSERT(!mRunning && !mThread);
224
0
    }
225
0
  }
226
227
  void Log(const char* format, ...) MOZ_FORMAT_PRINTF(2,3)
228
0
  {
229
0
    if (Enabled()) {
230
0
      auto* msg = new detail::MPSCQueue<MAX_MESSAGE_LENGTH>::Message();
231
0
      va_list args;
232
0
      va_start(args, format);
233
0
      VsprintfLiteral(msg->data, format, args);
234
0
      va_end(args);
235
0
      mMessageQueue.Push(msg);
236
0
    }
237
0
  }
238
239
  bool Enabled()
240
0
  {
241
0
    return MOZ_LOG_TEST(mLogModule, mozilla::LogLevel::Verbose);
242
0
  }
243
244
private:
245
  void Run()
246
0
  {
247
0
    MOZ_ASSERT(Enabled());
248
0
    mThread.reset(new std::thread([this]() {
249
0
      while (mRunning) {
250
0
        char message[MAX_MESSAGE_LENGTH];
251
0
        while (mMessageQueue.Pop(message) && mRunning) {
252
0
          MOZ_LOG(mLogModule, mozilla::LogLevel::Verbose, ("%s", message));
253
0
        }
254
0
        Sleep();
255
0
      }
256
0
    }));
257
0
  }
258
259
0
  void Sleep() { std::this_thread::sleep_for(std::chrono::milliseconds(10)); }
260
261
  std::unique_ptr<std::thread> mThread;
262
  mozilla::LazyLogModule mLogModule;
263
  detail::MPSCQueue<MAX_MESSAGE_LENGTH> mMessageQueue;
264
  std::atomic<bool> mRunning;
265
};
266
267
} // end namespace mozilla
268
269
#endif // mozilla_dom_AsyncLogger_h